mirror of
https://github.com/monero-project/monero.git
synced 2026-04-28 11:53:17 -07:00
@@ -128,7 +128,7 @@ namespace net_utils
|
||||
|
||||
void start_handshake();
|
||||
void start_read();
|
||||
void finish_read(size_t bytes_transferred);
|
||||
void handle_read(size_t bytes_transferred);
|
||||
void start_write();
|
||||
void start_shutdown();
|
||||
void cancel_socket();
|
||||
@@ -326,7 +326,7 @@ namespace net_utils
|
||||
//----------------- i_service_endpoint ---------------------
|
||||
virtual bool do_send(byte_slice message); ///< (see do_send from i_service_endpoint)
|
||||
virtual bool send_done();
|
||||
virtual bool close();
|
||||
virtual bool close(const bool wait_for_shutdown);
|
||||
virtual bool call_run_once_service_io();
|
||||
virtual bool request_callback();
|
||||
virtual io_context_t& get_io_context();
|
||||
@@ -379,7 +379,7 @@ namespace net_utils
|
||||
bool timed_wait_server_stop(uint64_t wait_mseconds);
|
||||
|
||||
/// Stop the server.
|
||||
void send_stop_signal();
|
||||
void send_stop_signal(std::function<void()> close_all_connections = [](){});
|
||||
|
||||
bool is_stop_signal_sent() const noexcept { return m_stop_signal_sent; };
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/uuid/random_generator.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/chrono.hpp>
|
||||
#include <boost/utility/value_init.hpp>
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
@@ -247,7 +248,7 @@ namespace net_utils
|
||||
)
|
||||
) {
|
||||
m_state.ssl.enabled = false;
|
||||
finish_read(bytes_transferred);
|
||||
handle_read(bytes_transferred);
|
||||
}
|
||||
else {
|
||||
m_state.ssl.detected = true;
|
||||
@@ -385,7 +386,7 @@ namespace net_utils
|
||||
m_conn_context.m_recv_cnt += bytes_transferred;
|
||||
start_timer(get_timeout_from_bytes_read(bytes_transferred), true);
|
||||
}
|
||||
finish_read(bytes_transferred);
|
||||
handle_read(bytes_transferred);
|
||||
}
|
||||
};
|
||||
if (!m_state.ssl.enabled)
|
||||
@@ -412,7 +413,7 @@ namespace net_utils
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void connection<T>::finish_read(size_t bytes_transferred)
|
||||
void connection<T>::handle_read(size_t bytes_transferred)
|
||||
{
|
||||
// Post handle_recv to a separate `strand_`, distinct from `m_strand`
|
||||
// which is listening for reads/writes. This avoids a circular dep.
|
||||
@@ -739,6 +740,7 @@ namespace net_utils
|
||||
}
|
||||
else
|
||||
m_state.status = status_t::WASTED;
|
||||
m_state.condition.notify_all();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
@@ -797,6 +799,7 @@ namespace net_utils
|
||||
m_state.socket.connected = false;
|
||||
}
|
||||
m_state.status = status_t::WASTED;
|
||||
m_state.condition.notify_all();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
@@ -1115,7 +1118,7 @@ namespace net_utils
|
||||
template<typename T>
|
||||
bool connection<T>::cancel()
|
||||
{
|
||||
return close();
|
||||
return close(false);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
@@ -1131,12 +1134,34 @@ namespace net_utils
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
bool connection<T>::close()
|
||||
bool connection<T>::close(const bool wait_for_shutdown)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(m_state.lock);
|
||||
if (m_state.status != status_t::RUNNING)
|
||||
return false;
|
||||
terminate_async();
|
||||
|
||||
// Sometimes we do *not* want to wait for the connection to shut down because for example handle_recv might try to
|
||||
// close the connection when handling a request. But handle_recv can't complete the shutdown sequence because
|
||||
// handle_read is set to true. So, in that case we call terminate_async and return here.
|
||||
if (!wait_for_shutdown)
|
||||
return true;
|
||||
|
||||
// Sometimes we *do* want to wait for the connection to shut down for example when stopping the server. When
|
||||
// stopping the server, we don't want the io_context to stop before the shutdown sequence completes, since we
|
||||
// execute terminate inside m_strand. So we wait for the connection's shutdown sequence to complete before stopping
|
||||
// the io_context.
|
||||
MDEBUG("Waiting for connection " << m_conn_context.m_connection_id << " to shutdown, current state: " << m_state.status);
|
||||
m_state.condition.wait(
|
||||
m_state.lock,
|
||||
[this]{
|
||||
return (
|
||||
m_state.status == status_t::TERMINATED || m_state.status == status_t::WASTED
|
||||
);
|
||||
}
|
||||
);
|
||||
MDEBUG("Shut down connection " << m_conn_context.m_connection_id);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1543,7 +1568,7 @@ namespace net_utils
|
||||
}
|
||||
//---------------------------------------------------------------------------------
|
||||
template<class t_protocol_handler>
|
||||
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
|
||||
void boosted_tcp_server<t_protocol_handler>::send_stop_signal(std::function<void()> close_all_connections)
|
||||
{
|
||||
m_stop_signal_sent = true;
|
||||
typename connection<t_protocol_handler>::shared_state *state = static_cast<typename connection<t_protocol_handler>::shared_state*>(m_state.get());
|
||||
@@ -1556,7 +1581,13 @@ namespace net_utils
|
||||
}
|
||||
connections_.clear();
|
||||
connections_mutex.unlock();
|
||||
|
||||
// Since we shut down connections in the strand, we want to make sure to complete the shutdown sequence before
|
||||
// stopping the io_context. We let the caller handle closing because the caller is the one keeping track of all
|
||||
// connections (connections_ is only a subset of all connections).
|
||||
close_all_connections();
|
||||
io_context_.stop();
|
||||
MDEBUG("Done with send_stop_signal");
|
||||
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
|
||||
}
|
||||
//---------------------------------------------------------------------------------
|
||||
|
||||
@@ -106,7 +106,7 @@ public:
|
||||
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, std::chrono::milliseconds timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
|
||||
|
||||
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
|
||||
bool close(boost::uuids::uuid connection_id);
|
||||
bool close(boost::uuids::uuid connection_id, const bool wait_for_shutdown);
|
||||
bool update_connection_context(const t_connection_context& contxt);
|
||||
bool request_callback(boost::uuids::uuid connection_id);
|
||||
template<class callback_t>
|
||||
@@ -120,7 +120,7 @@ public:
|
||||
|
||||
async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_initial_max_packet_size(LEVIN_INITIAL_MAX_PACKET_SIZE), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
|
||||
{}
|
||||
~async_protocol_handler_config() { set_handler(NULL, NULL); }
|
||||
virtual ~async_protocol_handler_config() { set_handler(NULL, NULL); }
|
||||
void del_out_connections(size_t count);
|
||||
void del_in_connections(size_t count);
|
||||
};
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout.count());
|
||||
epee::span<const uint8_t> fake;
|
||||
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
|
||||
con.close();
|
||||
con.close(false);
|
||||
con.finish_outer_call();
|
||||
});
|
||||
m_timer_started = true;
|
||||
@@ -267,7 +267,7 @@ public:
|
||||
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout.count());
|
||||
epee::span<const uint8_t> fake;
|
||||
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
|
||||
con.close();
|
||||
con.close(false);
|
||||
con.finish_outer_call();
|
||||
});
|
||||
}
|
||||
@@ -366,11 +366,11 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
bool close()
|
||||
bool close(const bool wait_for_shutdown)
|
||||
{
|
||||
++m_close_called;
|
||||
|
||||
m_pservice_endpoint->close();
|
||||
m_pservice_endpoint->close(wait_for_shutdown);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -714,7 +714,7 @@ void async_protocol_handler_config<t_connection_context>::delete_connections(siz
|
||||
CRITICAL_REGION_END();
|
||||
|
||||
for (size_t i = 0; i < connections.size() && i < count; ++i)
|
||||
connections[i]->close();
|
||||
connections[i]->close(false);
|
||||
}
|
||||
//------------------------------------------------------------------------------------------
|
||||
template<class t_connection_context>
|
||||
@@ -849,14 +849,14 @@ int async_protocol_handler_config<t_connection_context>::send(byte_slice message
|
||||
}
|
||||
//------------------------------------------------------------------------------------------
|
||||
template<class t_connection_context>
|
||||
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
|
||||
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id, const bool wait_for_shutdown)
|
||||
{
|
||||
async_protocol_handler<t_connection_context>* aph = nullptr;
|
||||
if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
|
||||
return false;
|
||||
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
|
||||
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
|
||||
if (!aph->close())
|
||||
if (!aph->close(wait_for_shutdown))
|
||||
return false;
|
||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||
m_connects.erase(connection_id);
|
||||
|
||||
@@ -441,7 +441,7 @@ namespace net_utils
|
||||
struct i_service_endpoint
|
||||
{
|
||||
virtual bool do_send(byte_slice message)=0;
|
||||
virtual bool close()=0;
|
||||
virtual bool close(const bool wait_for_shutdown)=0;
|
||||
virtual bool send_done()=0;
|
||||
virtual bool call_run_once_service_io()=0;
|
||||
virtual bool request_callback()=0;
|
||||
|
||||
@@ -315,7 +315,7 @@ namespace nodetool
|
||||
}
|
||||
|
||||
for (const auto &c: conns)
|
||||
zone.second.m_net_server.get_config_object().close(c);
|
||||
zone.second.m_net_server.get_config_object().close(c, false);
|
||||
|
||||
conns.clear();
|
||||
}
|
||||
@@ -371,7 +371,7 @@ namespace nodetool
|
||||
return true;
|
||||
});
|
||||
for (const auto &c: conns)
|
||||
zone.second.m_net_server.get_config_object().close(c);
|
||||
zone.second.m_net_server.get_config_object().close(c, false);
|
||||
|
||||
for (int i = 0; i < 2; ++i)
|
||||
zone.second.m_peerlist.filter(i == 0, [&subnet](const peerlist_entry &pe){
|
||||
@@ -1145,20 +1145,27 @@ namespace nodetool
|
||||
bool node_server<t_payload_net_handler>::send_stop_signal()
|
||||
{
|
||||
MDEBUG("[node] sending stop signal");
|
||||
for (auto& zone : m_network_zones)
|
||||
zone.second.m_net_server.send_stop_signal();
|
||||
MDEBUG("[node] Stop signal sent");
|
||||
|
||||
for (auto& zone : m_network_zones)
|
||||
{
|
||||
std::list<boost::uuids::uuid> connection_ids;
|
||||
zone.second.m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt) {
|
||||
connection_ids.push_back(cntxt.m_connection_id);
|
||||
return true;
|
||||
});
|
||||
for (const auto &connection_id: connection_ids)
|
||||
zone.second.m_net_server.get_config_object().close(connection_id);
|
||||
const auto close_all_connections = [&, this]()
|
||||
{
|
||||
std::list<boost::uuids::uuid> connection_ids;
|
||||
zone.second.m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt) {
|
||||
connection_ids.push_back(cntxt.m_connection_id);
|
||||
return true;
|
||||
});
|
||||
for (const auto &connection_id: connection_ids)
|
||||
{
|
||||
MDEBUG("Closing connection " << connection_id);
|
||||
// We need to wait for every connection's shutdown sequence to complete before stopping the io_context.
|
||||
zone.second.m_net_server.get_config_object().close(connection_id, true/*wait_for_shutdown*/);
|
||||
MDEBUG("Closed connection " << connection_id);
|
||||
}
|
||||
};
|
||||
|
||||
zone.second.m_net_server.send_stop_signal(close_all_connections);
|
||||
}
|
||||
MDEBUG("[node] Stop signal sent");
|
||||
m_payload_handler.stop();
|
||||
return true;
|
||||
}
|
||||
@@ -1245,7 +1252,7 @@ namespace nodetool
|
||||
{
|
||||
LOG_WARNING_CC(context_, "COMMAND_HANDSHAKE Failed");
|
||||
if (!timeout)
|
||||
zone.m_net_server.get_config_object().close(context_.m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(context_.m_connection_id, false);
|
||||
}
|
||||
else if (!just_take_peerlist)
|
||||
{
|
||||
@@ -1279,14 +1286,14 @@ namespace nodetool
|
||||
if(!handle_remote_peerlist(rsp.local_peerlist_new, context))
|
||||
{
|
||||
LOG_WARNING_CC(context, "COMMAND_TIMED_SYNC: failed to handle_remote_peerlist(...), closing connection.");
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id );
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id, false);
|
||||
add_host_fail(context.m_remote_address);
|
||||
}
|
||||
if(!context.m_is_income)
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_peerlist.set_peer_just_seen(context.peer_id, context.m_remote_address, context.m_pruning_seed, context.m_rpc_port, context.m_rpc_credits_per_hash);
|
||||
if (!m_payload_handler.process_payload_sync_data(rsp.payload_data, context, false))
|
||||
{
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id );
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id, false);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1443,7 +1450,7 @@ namespace nodetool
|
||||
|
||||
if(just_take_peerlist)
|
||||
{
|
||||
zone.m_net_server.get_config_object().close(con->m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(con->m_connection_id, false);
|
||||
LOG_DEBUG_CC(*con, "CONNECTION HANDSHAKED OK AND CLOSED.");
|
||||
return true;
|
||||
}
|
||||
@@ -1505,7 +1512,7 @@ namespace nodetool
|
||||
return false;
|
||||
}
|
||||
|
||||
zone.m_net_server.get_config_object().close(con->m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(con->m_connection_id, false);
|
||||
|
||||
LOG_DEBUG_CC(*con, "CONNECTION HANDSHAKED OK AND CLOSED.");
|
||||
|
||||
@@ -2435,7 +2442,7 @@ namespace nodetool
|
||||
template<class t_payload_net_handler>
|
||||
bool node_server<t_payload_net_handler>::drop_connection(const epee::net_utils::connection_context_base& context)
|
||||
{
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id);
|
||||
m_network_zones.at(context.m_remote_address.get_zone()).m_net_server.get_config_object().close(context.m_connection_id, false);
|
||||
return true;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------
|
||||
@@ -2518,17 +2525,17 @@ namespace nodetool
|
||||
if(rsp.status != PING_OK_RESPONSE_STATUS_TEXT || pr != rsp.peer_id)
|
||||
{
|
||||
LOG_WARNING_CC(ping_context, "back ping invoke wrong response \"" << rsp.status << "\" from" << address.str() << ", hsh_peer_id=" << pr_ << ", rsp.peer_id=" << peerid_to_string(rsp.peer_id));
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id, false);
|
||||
return;
|
||||
}
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id, false);
|
||||
cb();
|
||||
});
|
||||
|
||||
if(!inv_call_res)
|
||||
{
|
||||
LOG_WARNING_CC(ping_context, "back ping invoke failed to " << address.str());
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id);
|
||||
zone.m_net_server.get_config_object().close(ping_context.m_connection_id, false);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
@@ -159,7 +159,7 @@ namespace
|
||||
return m_send_return;
|
||||
}
|
||||
|
||||
virtual bool close() { return true; }
|
||||
virtual bool close(const bool wait_for_shutdown) { return true; }
|
||||
virtual bool send_done() { return true; }
|
||||
virtual bool call_run_once_service_io() { return true; }
|
||||
virtual bool request_callback() { return true; }
|
||||
|
||||
@@ -110,7 +110,7 @@ namespace
|
||||
{
|
||||
if (!m_connections[id].is_nil())
|
||||
{
|
||||
m_tcp_server.get_config_object().close(m_connections[id]);
|
||||
m_tcp_server.get_config_object().close(m_connections[id], true);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
|
||||
@@ -188,7 +188,7 @@ namespace net_load_tests
|
||||
LOG_PRINT_L0("Connection isn't opened");
|
||||
return false;
|
||||
}
|
||||
if (!m_tcp_server.get_config_object().close(m_connections[idx]))
|
||||
if (!m_tcp_server.get_config_object().close(m_connections[idx], true))
|
||||
{
|
||||
LOG_PRINT_L0("Close connection error: " << m_connections[idx]);
|
||||
if (!ignore_close_fails)
|
||||
|
||||
@@ -177,7 +177,7 @@ namespace
|
||||
if (!ctx.m_closed)
|
||||
{
|
||||
ctx.m_closed = true;
|
||||
m_tcp_server.get_config_object().close(ctx.m_connection_id);
|
||||
m_tcp_server.get_config_object().close(ctx.m_connection_id, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -38,10 +38,12 @@
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "cryptonote_protocol/cryptonote_protocol_defs.h"
|
||||
#include "include_base_utils.h"
|
||||
#include "string_tools.h"
|
||||
#include "net/abstract_tcp_server2.h"
|
||||
#include "net/levin_protocol_handler_async.h"
|
||||
#include "p2p/net_node.h"
|
||||
|
||||
namespace
|
||||
{
|
||||
@@ -238,7 +240,7 @@ TEST(test_epee_connection, test_lifetime)
|
||||
auto tag = create_connection();
|
||||
ASSERT_TRUE(shared_state->get_connections_count() == 1);
|
||||
bool success = shared_state->for_connection(tag, [shared_state](context_t& context){
|
||||
shared_state->close(context.m_connection_id);
|
||||
shared_state->close(context.m_connection_id, true);
|
||||
context.m_remote_address.get_zone();
|
||||
return true;
|
||||
});
|
||||
@@ -254,9 +256,9 @@ TEST(test_epee_connection, test_lifetime)
|
||||
success = shared_state->foreach_connection([&index, shared_state, &tags, &create_connection](context_t& context){
|
||||
if (!index)
|
||||
for (const auto &t: tags)
|
||||
shared_state->close(t);
|
||||
shared_state->close(t, true);
|
||||
|
||||
shared_state->close(context.m_connection_id);
|
||||
shared_state->close(context.m_connection_id, true);
|
||||
context.m_remote_address.get_zone();
|
||||
++index;
|
||||
|
||||
@@ -270,7 +272,7 @@ TEST(test_epee_connection, test_lifetime)
|
||||
|
||||
index = 0;
|
||||
success = shared_state->foreach_connection([&index, shared_state](context_t& context){
|
||||
shared_state->close(context.m_connection_id);
|
||||
shared_state->close(context.m_connection_id, true);
|
||||
context.m_remote_address.get_zone();
|
||||
++index;
|
||||
return true;
|
||||
@@ -300,7 +302,7 @@ TEST(test_epee_connection, test_lifetime)
|
||||
});
|
||||
ASSERT_TRUE(success);
|
||||
}
|
||||
shared_state->close(tag);
|
||||
shared_state->close(tag, true);
|
||||
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||
}
|
||||
|
||||
@@ -454,7 +456,7 @@ TEST(test_epee_connection, test_lifetime)
|
||||
auto tag = context.m_connection_id;
|
||||
boost::asio::post(io_context, [conn] { conn->cancel(); });
|
||||
conn.reset();
|
||||
s->close(tag);
|
||||
s->close(tag, true);
|
||||
while (s->sock_count);
|
||||
}
|
||||
});
|
||||
@@ -647,7 +649,7 @@ TEST(boosted_tcp_server, strand_deadlock)
|
||||
}
|
||||
else if(context.m_recv_cnt == 2) {
|
||||
guard.unlock();
|
||||
socket->close();
|
||||
socket->close(false);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@@ -715,3 +717,104 @@ TEST(boosted_tcp_server, strand_deadlock)
|
||||
server.timed_wait_server_stop(5 * 1000);
|
||||
server.deinit_server();
|
||||
}
|
||||
|
||||
TEST(boosted_tcp_server, shutdown)
|
||||
{
|
||||
struct context_t: epee::net_utils::connection_context_base {
|
||||
static constexpr size_t get_max_bytes(int) noexcept { return -1; }
|
||||
static constexpr int handshake_command() noexcept { return 1001; }
|
||||
static constexpr bool handshake_complete() noexcept { return true; }
|
||||
};
|
||||
|
||||
struct config_t : epee::levin::async_protocol_handler_config<context_t> {
|
||||
void received_handshake() { handshake_received.raise(); }
|
||||
epee::simple_event handshake_received;
|
||||
};
|
||||
|
||||
struct handler_t : epee::levin::async_protocol_handler<context_t> {
|
||||
using config_type = config_t;
|
||||
using connection_context = context_t;
|
||||
using epee::levin::async_protocol_handler<context_t>::async_protocol_handler;
|
||||
|
||||
bool handle_recv(const void *data, size_t bytes_transferred)
|
||||
{
|
||||
// We don't respond to the handshake (the async_invoke_remote_command2 is waiting for a response)
|
||||
MINFO("handle_recv just came in");
|
||||
config_t* config = dynamic_cast<config_t*>(&m_config);
|
||||
if (config == nullptr)
|
||||
throw std::runtime_error("m_config must be of type config_t");
|
||||
config->received_handshake();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::make_address("127.0.0.1"), 5262);
|
||||
epee::net_utils::boosted_tcp_server<handler_t> server(epee::net_utils::e_connection_type_P2P);
|
||||
server.init_server(
|
||||
endpoint.port(),
|
||||
endpoint.address().to_string(),
|
||||
{},
|
||||
{},
|
||||
{},
|
||||
true,
|
||||
epee::net_utils::ssl_support_t::e_ssl_support_disabled
|
||||
);
|
||||
|
||||
// Run the server in a thread and wait for it to start
|
||||
MINFO("Starting the server");
|
||||
std::thread running_server([&]{server.run_server(2, true/*wait*/);} );
|
||||
|
||||
// Have the server connect to itself
|
||||
MINFO("Connecting the server to itself");
|
||||
context_t context;
|
||||
{
|
||||
epee::simple_event connected;
|
||||
server.async_call(
|
||||
[&]{
|
||||
ASSERT_TRUE(
|
||||
server.connect(
|
||||
endpoint.address().to_string(),
|
||||
std::to_string(endpoint.port()),
|
||||
5,
|
||||
context,
|
||||
"0.0.0.0",
|
||||
epee::net_utils::ssl_support_t::e_ssl_support_disabled
|
||||
)
|
||||
);
|
||||
connected.raise();
|
||||
}
|
||||
);
|
||||
connected.wait();
|
||||
}
|
||||
|
||||
// Invoke handshake to the connection, and wait for cb cancel in a separate thread
|
||||
MINFO("Invoking handshake");
|
||||
epee::simple_event ev;
|
||||
{
|
||||
using COMMAND_HANDSHAKE = nodetool::COMMAND_HANDSHAKE_T<cryptonote::CORE_SYNC_DATA>;
|
||||
COMMAND_HANDSHAKE::request arg;
|
||||
bool r = epee::net_utils::async_invoke_remote_command2<COMMAND_HANDSHAKE::response>(context, COMMAND_HANDSHAKE::ID, arg, server.get_config_object(),
|
||||
[&ev](int code, const COMMAND_HANDSHAKE::response&, context_t&)
|
||||
{
|
||||
ASSERT_EQ(code, LEVIN_ERROR_CONNECTION_DESTROYED);
|
||||
ev.raise();
|
||||
}, std::chrono::milliseconds{P2P_DEFAULT_HANDSHAKE_INVOKE_TIMEOUT});
|
||||
ASSERT_TRUE(r);
|
||||
|
||||
MINFO("Waiting for handshake invocation to be received");
|
||||
server.get_config_object().handshake_received.wait();
|
||||
}
|
||||
|
||||
// Now stop the server, providing the callback necessary to wait for all connections to shutdown
|
||||
const auto close_all_connections = [&]()
|
||||
{
|
||||
server.get_config_object().close(context.m_connection_id, true/*wait_for_shutdown*/);
|
||||
};
|
||||
|
||||
MINFO("Stopping the server");
|
||||
server.send_stop_signal(close_all_connections);
|
||||
running_server.join();
|
||||
|
||||
MINFO("Waiting for handshake to cancel");
|
||||
ev.wait();
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ namespace
|
||||
return m_send_return;
|
||||
}
|
||||
|
||||
virtual bool close() { /*std::cout << "test_connection::close()" << std::endl; */return true; }
|
||||
virtual bool close(const bool wait_for_shutdown) { /*std::cout << "test_connection::close()" << std::endl; */return true; }
|
||||
virtual bool send_done() { /*std::cout << "test_connection::send_done()" << std::endl; */return true; }
|
||||
virtual bool call_run_once_service_io() { std::cout << "test_connection::call_run_once_service_io()" << std::endl; return true; }
|
||||
virtual bool request_callback() { std::cout << "test_connection::request_callback()" << std::endl; return true; }
|
||||
@@ -572,7 +572,7 @@ TEST_F(test_levin_protocol_handler__hanle_recv_with_invalid_data, does_not_handl
|
||||
{
|
||||
prepare_buf();
|
||||
|
||||
ASSERT_TRUE(m_conn->m_protocol_handler.close());
|
||||
ASSERT_TRUE(m_conn->m_protocol_handler.close(true));
|
||||
ASSERT_FALSE(m_conn->m_protocol_handler.handle_recv(m_buf.data(), m_buf.size()));
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ namespace
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool close() override final
|
||||
virtual bool close(const bool wait_for_shutdown) override final
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -43,6 +43,21 @@
|
||||
#define MAKE_IPV4_ADDRESS_PORT(a,b,c,d,e) epee::net_utils::ipv4_network_address{MAKE_IP(a,b,c,d),e}
|
||||
#define MAKE_IPV4_SUBNET(a,b,c,d,e) epee::net_utils::ipv4_network_subnet{MAKE_IP(a,b,c,d),e}
|
||||
|
||||
namespace
|
||||
{
|
||||
boost::filesystem::path create_temp_dir()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec);
|
||||
if (ec)
|
||||
return boost::filesystem::path{};
|
||||
auto success = boost::filesystem::create_directory(path, ec);
|
||||
if (!ec && success)
|
||||
return path;
|
||||
return boost::filesystem::path{};
|
||||
}
|
||||
}
|
||||
|
||||
namespace cryptonote {
|
||||
class blockchain_storage;
|
||||
}
|
||||
@@ -301,17 +316,7 @@ TEST(ban, file_banlist)
|
||||
Server server(cprotocol);
|
||||
cprotocol.set_p2p_endpoint(&server);
|
||||
|
||||
auto create_node_dir = [](){
|
||||
boost::system::error_code ec;
|
||||
auto path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec);
|
||||
if (ec)
|
||||
return boost::filesystem::path{};
|
||||
auto success = boost::filesystem::create_directory(path, ec);
|
||||
if (!ec && success)
|
||||
return path;
|
||||
return boost::filesystem::path{};
|
||||
};
|
||||
const auto node_dir = create_node_dir();
|
||||
const auto node_dir = create_temp_dir();
|
||||
ASSERT_TRUE(!node_dir.empty());
|
||||
auto auto_remove_node_dir = epee::misc_utils::create_scope_leave_handler([&node_dir](){
|
||||
boost::filesystem::remove_all(node_dir);
|
||||
@@ -616,7 +621,7 @@ TEST(cryptonote_protocol_handler, race_condition)
|
||||
}
|
||||
virtual bool drop_connection(const contexts::basic& context) override {
|
||||
if (shared_state)
|
||||
return shared_state->close(context.m_connection_id);
|
||||
return shared_state->close(context.m_connection_id, true);
|
||||
else
|
||||
return {};
|
||||
}
|
||||
@@ -698,16 +703,6 @@ TEST(cryptonote_protocol_handler, race_condition)
|
||||
handshaked.wait();
|
||||
};
|
||||
using path_t = boost::filesystem::path;
|
||||
auto create_dir = []{
|
||||
ec_t ec;
|
||||
path_t path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec);
|
||||
if (ec)
|
||||
return path_t{};
|
||||
auto success = boost::filesystem::create_directory(path, ec);
|
||||
if (not ec && success)
|
||||
return path;
|
||||
return path_t{};
|
||||
};
|
||||
auto remove_tree = [](const path_t &path){
|
||||
ec_t ec;
|
||||
boost::filesystem::remove_all(path, ec);
|
||||
@@ -727,7 +722,7 @@ TEST(cryptonote_protocol_handler, race_condition)
|
||||
};
|
||||
using options_description_t = boost::program_options::options_description;
|
||||
|
||||
const auto dir = create_dir();
|
||||
const auto dir = create_temp_dir();
|
||||
ASSERT_TRUE(not dir.empty());
|
||||
|
||||
daemons_t daemon{
|
||||
@@ -1216,21 +1211,11 @@ TEST(node_server, race_condition)
|
||||
};
|
||||
using path_t = boost::filesystem::path;
|
||||
using ec_t = boost::system::error_code;
|
||||
auto create_dir = []{
|
||||
ec_t ec;
|
||||
path_t path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec);
|
||||
if (ec)
|
||||
return path_t{};
|
||||
auto success = boost::filesystem::create_directory(path, ec);
|
||||
if (not ec && success)
|
||||
return path;
|
||||
return path_t{};
|
||||
};
|
||||
auto remove_tree = [](const path_t &path){
|
||||
ec_t ec;
|
||||
boost::filesystem::remove_all(path, ec);
|
||||
};
|
||||
const auto dir = create_dir();
|
||||
const auto dir = create_temp_dir();
|
||||
ASSERT_TRUE(not dir.empty());
|
||||
protocol_t protocol{};
|
||||
node_server_t node_server(protocol);
|
||||
|
||||
Reference in New Issue
Block a user