p2p: close zone connections before stopping net servers

This commit is contained in:
selsta
2026-05-31 03:09:17 +02:00
parent 3623399ea2
commit f2afe976c7
4 changed files with 88 additions and 42 deletions
@@ -320,7 +320,7 @@ namespace net_utils
bool speed_limit_is_enabled() const; ///< tells us should we be sleeping here (e.g. do not sleep on RPC connections)
bool cancel();
bool cancel(bool wait_for_shutdown = false);
private:
//----------------- i_service_endpoint ---------------------
@@ -378,8 +378,21 @@ namespace net_utils
/// wait for service workers stop
bool timed_wait_server_stop(uint64_t wait_mseconds);
/// Mark the server as stopping without closing connections or stopping the io_context.
bool mark_stop_signal_sent();
/// Close boosted_tcp_server-owned connections, including ones not yet registered with the protocol handler.
void close_server_connections();
/// Stop the server io_context.
void stop_io_context();
/// Stop the server.
void send_stop_signal(std::function<void()> close_all_connections = [](){});
///
/// Warning: Do NOT call this if the io_context is shared for connections
/// managed outside the boosted_tcp_server. See p2p net_node shutdown for
/// the correct staged shutdown in that case.
void send_stop_signal();
bool is_stop_signal_sent() const noexcept { return m_stop_signal_sent; };
@@ -1121,9 +1121,9 @@ namespace net_utils
}
template<typename T>
bool connection<T>::cancel()
bool connection<T>::cancel(const bool wait_for_shutdown)
{
return close(false);
return close(wait_for_shutdown);
}
template<typename T>
@@ -1288,7 +1288,7 @@ namespace net_utils
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::~boosted_tcp_server()
{
this->send_stop_signal();
send_stop_signal();
timed_wait_server_stop(10000);
}
//---------------------------------------------------------------------------------
@@ -1579,30 +1579,55 @@ namespace net_utils
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal(std::function<void()> close_all_connections)
bool boosted_tcp_server<t_protocol_handler>::mark_stop_signal_sent()
{
if (m_stop_signal_sent.exchange(true))
{
MDEBUG("Stop signal already sent");
return;
return false;
}
typename connection<t_protocol_handler>::shared_state *state = static_cast<typename connection<t_protocol_handler>::shared_state*>(m_state.get());
state->stop_signal_sent = true;
TRY_ENTRY();
connections_mutex.lock();
for (auto &c: connections_)
return true;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::close_server_connections()
{
decltype(connections_) connections;
{
c->cancel();
boost::unique_lock<boost::mutex> lock(connections_mutex);
connections.swap(connections_);
}
connections_.clear();
connections_mutex.unlock();
// Since we shut down connections in the strand, we want to make sure to complete the shutdown sequence before
// stopping the io_context. We let the caller handle closing because the caller is the one keeping track of all
// connections (connections_ is only a subset of all connections).
close_all_connections();
for (auto &c: connections)
{
c->cancel(true/*wait_for_shutdown*/);
}
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::stop_io_context()
{
{
boost::unique_lock<boost::mutex> lock(connections_mutex);
if (!connections_.empty())
{
MERROR("Stopping io_context with " << connections_.size() << " server-owned connections still open");
}
}
MDEBUG("Stopping io_context");
io_context_.stop();
MDEBUG("Done with send_stop_signal");
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
TRY_ENTRY();
if (!mark_stop_signal_sent())
return;
close_server_connections();
stop_io_context();
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
}
//---------------------------------------------------------------------------------
+28 -17
View File
@@ -1114,26 +1114,37 @@ namespace nodetool
}
MDEBUG("[node] stopping server payload handler");
m_payload_handler.stop();
MDEBUG("[node] sending stop signal");
MDEBUG("[node] marking net servers as stopping");
for (auto& zone : m_network_zones)
{
const auto close_all_connections = [&]()
{
std::list<boost::uuids::uuid> connection_ids;
zone.second.m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt) {
connection_ids.push_back(cntxt.m_connection_id);
return true;
});
for (const auto &connection_id: connection_ids)
{
MDEBUG("Closing connection " << connection_id);
// We need to wait for every connection's shutdown sequence to complete before stopping the io_context.
zone.second.m_net_server.get_config_object().close(connection_id, true/*wait_for_shutdown*/);
MDEBUG("Closed connection " << connection_id);
}
};
zone.second.m_net_server.mark_stop_signal_sent();
}
zone.second.m_net_server.send_stop_signal(close_all_connections);
MDEBUG("[node] closing connections");
for (auto& zone : m_network_zones)
{
zone.second.m_net_server.close_server_connections();
std::list<boost::uuids::uuid> connection_ids;
zone.second.m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt)
{
connection_ids.push_back(cntxt.m_connection_id);
return true;
});
for (const auto &connection_id: connection_ids)
{
MDEBUG("Closing connection " << connection_id);
// All zone connections must finish shutting down before any shared io_context is stopped.
zone.second.m_net_server.get_config_object().close(connection_id, true/*wait_for_shutdown*/);
MDEBUG("Closed connection " << connection_id);
}
}
MDEBUG("[node] stopping net server io_contexts");
for (auto& zone : m_network_zones)
{
zone.second.m_net_server.stop_io_context();
}
MDEBUG("[node] Stop signal sent");
return true;
+4 -7
View File
@@ -816,14 +816,11 @@ TEST(boosted_tcp_server, shutdown)
server.get_config_object().handshake_received.wait();
}
// Now stop the server, providing the callback necessary to wait for all connections to shutdown
const auto close_all_connections = [&]()
{
server.get_config_object().close(context.m_connection_id, true/*wait_for_shutdown*/);
};
MINFO("Stopping the server");
server.send_stop_signal(close_all_connections);
server.mark_stop_signal_sent();
server.close_server_connections();
server.get_config_object().close(context.m_connection_id, true/*wait_for_shutdown*/);
server.stop_io_context();
running_server.join();
MINFO("Waiting for handshake to cancel");