mirror of
https://github.com/rosenpass/rosenpass.git
synced 2026-02-28 14:33:37 -08:00
fix(rosenpass): Integrate signal handlers with mio
With this commit, rosenpass uses a signal handler based on the signal-hook-mio crate. Even though, in this commit, no rosenpass-rp code is touched, this also fixes the signal handling in rosenpass-rp. The way rosenpass is integrated in rp is a bit of a hack – it just directly embeds rosenpass in the same process (though on a dedicated thread). For this reason, rp now just inherits rosenpass' signal handlers. The rosenpass event_loop() will terminate. The main loop of `rp` just spends most of the time waiting for rosenpass itself to finish, and exits when it finishes. Unfortunately, this means we are not using signalfd(2)[^0]; the signal-hook-mio crate appears to use a pipe-based mechanism to deliver events to mio instead. This may not be such a bad thing, as signalfd has some severe drawbacks with respect to subprocesses and masked signals[^1]. Fixes: #358 (https://github.com/rosenpass/rosenpass/issues/385) Fixes: #522 (https://github.com/rosenpass/rosenpass/issues/522) Fixes: #678 (https://github.com/rosenpass/rosenpass/pull/678) [^0]: https://unixism.net/2021/02/making-signals-less-painful-under-linux/ [^1]: https://ldpreload.com/blog/signalfd-is-useless?reposted-on-request
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -2070,6 +2070,7 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serial_test",
|
"serial_test",
|
||||||
"signal-hook",
|
"signal-hook",
|
||||||
|
"signal-hook-mio",
|
||||||
"stacker",
|
"stacker",
|
||||||
"static_assertions",
|
"static_assertions",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
@@ -2437,14 +2438,25 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook"
|
name = "signal-hook"
|
||||||
version = "0.3.17"
|
version = "0.3.18"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801"
|
checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook-mio"
|
||||||
|
version = "0.2.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"mio",
|
||||||
|
"signal-hook",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.2"
|
version = "1.4.2"
|
||||||
|
|||||||
@@ -54,6 +54,8 @@ serde = { version = "1.0.217", features = ["derive"] }
|
|||||||
arbitrary = { version = "1.4.1", features = ["derive"] }
|
arbitrary = { version = "1.4.1", features = ["derive"] }
|
||||||
anyhow = { version = "1.0.95", features = ["backtrace", "std"] }
|
anyhow = { version = "1.0.95", features = ["backtrace", "std"] }
|
||||||
mio = { version = "1.0.3", features = ["net", "os-poll"] }
|
mio = { version = "1.0.3", features = ["net", "os-poll"] }
|
||||||
|
signal-hook-mio = { version = "0.2.4", features = ["support-v1_0"] }
|
||||||
|
signal-hook = "0.3.17"
|
||||||
oqs-sys = { version = "0.9.1", default-features = false, features = [
|
oqs-sys = { version = "0.9.1", default-features = false, features = [
|
||||||
'classic_mceliece',
|
'classic_mceliece',
|
||||||
'kyber',
|
'kyber',
|
||||||
@@ -79,7 +81,6 @@ hex = { version = "0.4.3" }
|
|||||||
heck = { version = "0.5.0" }
|
heck = { version = "0.5.0" }
|
||||||
libc = { version = "0.2" }
|
libc = { version = "0.2" }
|
||||||
uds = { git = "https://github.com/rosenpass/uds" }
|
uds = { git = "https://github.com/rosenpass/uds" }
|
||||||
signal-hook = "0.3.17"
|
|
||||||
lazy_static = "1.5"
|
lazy_static = "1.5"
|
||||||
|
|
||||||
#Dev dependencies
|
#Dev dependencies
|
||||||
|
|||||||
@@ -64,6 +64,8 @@ clap = { workspace = true }
|
|||||||
clap_complete = { workspace = true }
|
clap_complete = { workspace = true }
|
||||||
clap_mangen = { workspace = true }
|
clap_mangen = { workspace = true }
|
||||||
mio = { workspace = true }
|
mio = { workspace = true }
|
||||||
|
signal-hook = { workspace = true }
|
||||||
|
signal-hook-mio = { workspace = true }
|
||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
zerocopy = { workspace = true }
|
zerocopy = { workspace = true }
|
||||||
home = { workspace = true }
|
home = { workspace = true }
|
||||||
@@ -76,7 +78,6 @@ heck = { workspace = true, optional = true }
|
|||||||
command-fds = { workspace = true, optional = true }
|
command-fds = { workspace = true, optional = true }
|
||||||
rustix = { workspace = true, optional = true }
|
rustix = { workspace = true, optional = true }
|
||||||
uds = { workspace = true, optional = true, features = ["mio_1xx"] }
|
uds = { workspace = true, optional = true, features = ["mio_1xx"] }
|
||||||
signal-hook = { workspace = true, optional = true }
|
|
||||||
libcrux-test-utils = { workspace = true, optional = true }
|
libcrux-test-utils = { workspace = true, optional = true }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
@@ -109,7 +110,6 @@ experiment_api = [
|
|||||||
"rosenpass-util/experiment_file_descriptor_passing",
|
"rosenpass-util/experiment_file_descriptor_passing",
|
||||||
"rosenpass-wireguard-broker/experiment_api",
|
"rosenpass-wireguard-broker/experiment_api",
|
||||||
]
|
]
|
||||||
internal_signal_handling_for_coverage_reports = ["signal-hook"]
|
|
||||||
internal_testing = []
|
internal_testing = []
|
||||||
internal_bin_gen_ipc_msg_types = ["hex", "heck"]
|
internal_bin_gen_ipc_msg_types = ["hex", "heck"]
|
||||||
trace_bench = ["rosenpass-util/trace_bench", "dep:libcrux-test-utils"]
|
trace_bench = ["rosenpass-util/trace_bench", "dep:libcrux-test-utils"]
|
||||||
|
|||||||
@@ -7,17 +7,20 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSoc
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use std::{cell::Cell, fmt::Debug, io, path::PathBuf, slice};
|
use std::{cell::Cell, fmt::Debug, io, path::PathBuf, slice};
|
||||||
|
|
||||||
|
use mio::{Interest, Token};
|
||||||
|
use signal_hook_mio::v1_0 as signal_hook_mio;
|
||||||
|
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use derive_builder::Builder;
|
use derive_builder::Builder;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use mio::{Interest, Token};
|
|
||||||
use zerocopy::AsBytes;
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
use rosenpass_util::attempt;
|
use rosenpass_util::attempt;
|
||||||
|
use rosenpass_util::fmt::debug::NullDebug;
|
||||||
use rosenpass_util::functional::{run, ApplyExt};
|
use rosenpass_util::functional::{run, ApplyExt};
|
||||||
use rosenpass_util::io::{IoResultKindHintExt, SubstituteForIoErrorKindExt};
|
use rosenpass_util::io::{IoResultKindHintExt, SubstituteForIoErrorKindExt};
|
||||||
use rosenpass_util::{
|
use rosenpass_util::{
|
||||||
b64::B64Display, build::ConstructionSite, file::StoreValueB64, option::SomeExt, result::OkExt,
|
b64::B64Display, build::ConstructionSite, file::StoreValueB64, result::OkExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
use rosenpass_secret_memory::{Public, Secret};
|
use rosenpass_secret_memory::{Public, Secret};
|
||||||
@@ -286,12 +289,20 @@ pub enum AppServerIoSource {
|
|||||||
Socket(usize),
|
Socket(usize),
|
||||||
/// IO source refers to a PSK broker in [AppServer::brokers]
|
/// IO source refers to a PSK broker in [AppServer::brokers]
|
||||||
PskBroker(Public<BROKER_ID_BYTES>),
|
PskBroker(Public<BROKER_ID_BYTES>),
|
||||||
|
/// IO source refers to our signal handlers
|
||||||
|
SignalHandler,
|
||||||
/// IO source refers to some IO sources used in the API;
|
/// IO source refers to some IO sources used in the API;
|
||||||
/// see [AppServer::api_manager]
|
/// see [AppServer::api_manager]
|
||||||
#[cfg(feature = "experiment_api")]
|
#[cfg(feature = "experiment_api")]
|
||||||
MioManager(crate::api::mio::MioManagerIoSource),
|
MioManager(crate::api::mio::MioManagerIoSource),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum AppServerTryRecvResult {
|
||||||
|
None,
|
||||||
|
Terminate,
|
||||||
|
NetworkMessage(usize, Endpoint),
|
||||||
|
}
|
||||||
|
|
||||||
/// Number of epoll(7) events Rosenpass can receive at a time
|
/// Number of epoll(7) events Rosenpass can receive at a time
|
||||||
const EVENT_CAPACITY: usize = 20;
|
const EVENT_CAPACITY: usize = 20;
|
||||||
|
|
||||||
@@ -332,6 +343,8 @@ pub struct AppServer {
|
|||||||
/// MIO associates IO sources with numeric tokens. This struct takes care of generating these
|
/// MIO associates IO sources with numeric tokens. This struct takes care of generating these
|
||||||
/// tokens
|
/// tokens
|
||||||
pub mio_token_dispenser: MioTokenDispenser,
|
pub mio_token_dispenser: MioTokenDispenser,
|
||||||
|
/// Mio-based handler for signals
|
||||||
|
pub signal_handler: NullDebug<signal_hook_mio::Signals>,
|
||||||
/// Helpers handling communication with WireGuard; these take a generated key and forward it to
|
/// Helpers handling communication with WireGuard; these take a generated key and forward it to
|
||||||
/// WireGuard
|
/// WireGuard
|
||||||
pub brokers: BrokerStore,
|
pub brokers: BrokerStore,
|
||||||
@@ -357,16 +370,6 @@ pub struct AppServer {
|
|||||||
/// Used by integration tests to force [Self] into DoS condition
|
/// Used by integration tests to force [Self] into DoS condition
|
||||||
/// and to terminate the AppServer after the test is complete
|
/// and to terminate the AppServer after the test is complete
|
||||||
pub test_helpers: Option<AppServerTest>,
|
pub test_helpers: Option<AppServerTest>,
|
||||||
/// Helper for integration tests running rosenpass as a subprocess
|
|
||||||
/// to terminate properly upon receiving an appropriate system signal.
|
|
||||||
///
|
|
||||||
/// This is primarily needed for coverage testing, since llvm-cov does not
|
|
||||||
/// write coverage reports to disk when a process is stopped by the default
|
|
||||||
/// signal handler.
|
|
||||||
///
|
|
||||||
/// See <https://github.com/rosenpass/rosenpass/issues/385>
|
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
pub term_signal: terminate::TerminateRequested,
|
|
||||||
#[cfg(feature = "experiment_api")]
|
#[cfg(feature = "experiment_api")]
|
||||||
/// The Rosenpass unix socket API handler; this is an experimental
|
/// The Rosenpass unix socket API handler; this is an experimental
|
||||||
/// feature that can be used to embed Rosenpass in external applications
|
/// feature that can be used to embed Rosenpass in external applications
|
||||||
@@ -456,6 +459,8 @@ impl AppPeerPtr {
|
|||||||
/// Instructs [AppServer::event_loop_without_error_handling] on how to proceed.
|
/// Instructs [AppServer::event_loop_without_error_handling] on how to proceed.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum AppPollResult {
|
pub enum AppPollResult {
|
||||||
|
/// Received request to terminate the application
|
||||||
|
Terminate,
|
||||||
/// Erase the key for a given peer. Corresponds to [crate::protocol::PollResult::DeleteKey]
|
/// Erase the key for a given peer. Corresponds to [crate::protocol::PollResult::DeleteKey]
|
||||||
DeleteKey(AppPeerPtr),
|
DeleteKey(AppPeerPtr),
|
||||||
/// Send an initiation to the given peer. Corresponds to [crate::protocol::PollResult::SendInitiation]
|
/// Send an initiation to the given peer. Corresponds to [crate::protocol::PollResult::SendInitiation]
|
||||||
@@ -802,10 +807,27 @@ impl AppServer {
|
|||||||
verbosity: Verbosity,
|
verbosity: Verbosity,
|
||||||
test_helpers: Option<AppServerTest>,
|
test_helpers: Option<AppServerTest>,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
// setup mio
|
// Setup Mio itself
|
||||||
let mio_poll = mio::Poll::new()?;
|
let mio_poll = mio::Poll::new()?;
|
||||||
let events = mio::Events::with_capacity(EVENT_CAPACITY);
|
let events = mio::Events::with_capacity(EVENT_CAPACITY);
|
||||||
|
|
||||||
|
// And helpers to map mio tokens to internal event types
|
||||||
let mut mio_token_dispenser = MioTokenDispenser::default();
|
let mut mio_token_dispenser = MioTokenDispenser::default();
|
||||||
|
let mut io_source_index = HashMap::new();
|
||||||
|
|
||||||
|
// Setup signal handling
|
||||||
|
let signal_handler = attempt!({
|
||||||
|
let mut handler =
|
||||||
|
signal_hook_mio::Signals::new(signal_hook::consts::TERM_SIGNALS.iter())?;
|
||||||
|
let mio_token = mio_token_dispenser.dispense();
|
||||||
|
mio_poll
|
||||||
|
.registry()
|
||||||
|
.register(&mut handler, mio_token, Interest::READABLE)?;
|
||||||
|
let prev = io_source_index.insert(mio_token, AppServerIoSource::SignalHandler);
|
||||||
|
assert!(prev.is_none());
|
||||||
|
Ok(NullDebug(handler))
|
||||||
|
})
|
||||||
|
.context("Failed to set up signal (user triggered program termination) handler")?;
|
||||||
|
|
||||||
// bind each SocketAddr to a socket
|
// bind each SocketAddr to a socket
|
||||||
let maybe_sockets: Result<Vec<_>, _> =
|
let maybe_sockets: Result<Vec<_>, _> =
|
||||||
@@ -879,7 +901,6 @@ impl AppServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// register all sockets to mio
|
// register all sockets to mio
|
||||||
let mut io_source_index = HashMap::new();
|
|
||||||
for (idx, socket) in sockets.iter_mut().enumerate() {
|
for (idx, socket) in sockets.iter_mut().enumerate() {
|
||||||
let mio_token = mio_token_dispenser.dispense();
|
let mio_token = mio_token_dispenser.dispense();
|
||||||
mio_poll
|
mio_poll
|
||||||
@@ -895,8 +916,6 @@ impl AppServer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
term_signal: terminate::TerminateRequested::new()?,
|
|
||||||
crypto_site,
|
crypto_site,
|
||||||
peers: Vec::new(),
|
peers: Vec::new(),
|
||||||
verbosity,
|
verbosity,
|
||||||
@@ -907,6 +926,7 @@ impl AppServer {
|
|||||||
io_source_index,
|
io_source_index,
|
||||||
mio_poll,
|
mio_poll,
|
||||||
mio_token_dispenser,
|
mio_token_dispenser,
|
||||||
|
signal_handler,
|
||||||
brokers: BrokerStore::default(),
|
brokers: BrokerStore::default(),
|
||||||
all_sockets_drained: false,
|
all_sockets_drained: false,
|
||||||
under_load: DoSOperation::Normal,
|
under_load: DoSOperation::Normal,
|
||||||
@@ -1049,7 +1069,7 @@ impl AppServer {
|
|||||||
Ok(AppPeerPtr(pn))
|
Ok(AppPeerPtr(pn))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main IO handler; this generally does not terminate
|
/// Main IO handler; this generally does not terminate other than through unix signals
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
@@ -1066,23 +1086,6 @@ impl AppServer {
|
|||||||
Err(e) => e,
|
Err(e) => e,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
{
|
|
||||||
let terminated_by_signal = err
|
|
||||||
.downcast_ref::<std::io::Error>()
|
|
||||||
.filter(|e| e.kind() == std::io::ErrorKind::Interrupted)
|
|
||||||
.filter(|_| self.term_signal.value())
|
|
||||||
.is_some();
|
|
||||||
if terminated_by_signal {
|
|
||||||
log::warn!(
|
|
||||||
"\
|
|
||||||
Terminated by signal; this signal handler is correct during coverage testing \
|
|
||||||
but should be otherwise disabled"
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should not happen…
|
// This should not happen…
|
||||||
failure_cnt = if msgs_processed > 0 {
|
failure_cnt = if msgs_processed > 0 {
|
||||||
0
|
0
|
||||||
@@ -1135,6 +1138,7 @@ impl AppServer {
|
|||||||
use AppPollResult::*;
|
use AppPollResult::*;
|
||||||
use KeyOutputReason::*;
|
use KeyOutputReason::*;
|
||||||
|
|
||||||
|
// TODO: We should read from this using a mio channel
|
||||||
if let Some(AppServerTest {
|
if let Some(AppServerTest {
|
||||||
termination_handler: Some(terminate),
|
termination_handler: Some(terminate),
|
||||||
..
|
..
|
||||||
@@ -1158,6 +1162,8 @@ impl AppServer {
|
|||||||
|
|
||||||
#[allow(clippy::redundant_closure_call)]
|
#[allow(clippy::redundant_closure_call)]
|
||||||
match (have_crypto, poll_result) {
|
match (have_crypto, poll_result) {
|
||||||
|
(_, Terminate) => return Ok(()),
|
||||||
|
|
||||||
(CryptoSrv::Missing, SendInitiation(_)) => {}
|
(CryptoSrv::Missing, SendInitiation(_)) => {}
|
||||||
(CryptoSrv::Avail, SendInitiation(peer)) => tx_maybe_with!(peer, || self
|
(CryptoSrv::Avail, SendInitiation(peer)) => tx_maybe_with!(peer, || self
|
||||||
.crypto_server_mut()?
|
.crypto_server_mut()?
|
||||||
@@ -1305,6 +1311,7 @@ impl AppServer {
|
|||||||
pub fn poll(&mut self, rx_buf: &mut [u8]) -> anyhow::Result<AppPollResult> {
|
pub fn poll(&mut self, rx_buf: &mut [u8]) -> anyhow::Result<AppPollResult> {
|
||||||
use crate::protocol::PollResult as C;
|
use crate::protocol::PollResult as C;
|
||||||
use AppPollResult as A;
|
use AppPollResult as A;
|
||||||
|
use AppServerTryRecvResult as R;
|
||||||
let res = loop {
|
let res = loop {
|
||||||
// Call CryptoServer's poll (if available)
|
// Call CryptoServer's poll (if available)
|
||||||
let crypto_poll = self
|
let crypto_poll = self
|
||||||
@@ -1325,8 +1332,10 @@ impl AppServer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Perform IO (look for a message)
|
// Perform IO (look for a message)
|
||||||
if let Some((len, addr)) = self.try_recv(rx_buf, io_poll_timeout)? {
|
match self.try_recv(rx_buf, io_poll_timeout)? {
|
||||||
break A::ReceivedMessage(len, addr);
|
R::None => {}
|
||||||
|
R::Terminate => break A::Terminate,
|
||||||
|
R::NetworkMessage(len, addr) => break A::ReceivedMessage(len, addr),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1344,12 +1353,12 @@ impl AppServer {
|
|||||||
&mut self,
|
&mut self,
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
timeout: Timing,
|
timeout: Timing,
|
||||||
) -> anyhow::Result<Option<(usize, Endpoint)>> {
|
) -> anyhow::Result<AppServerTryRecvResult> {
|
||||||
let timeout = Duration::from_secs_f64(timeout);
|
let timeout = Duration::from_secs_f64(timeout);
|
||||||
|
|
||||||
// if there is no time to wait on IO, well, then, lets not waste any time!
|
// if there is no time to wait on IO, well, then, lets not waste any time!
|
||||||
if timeout.is_zero() {
|
if timeout.is_zero() {
|
||||||
return Ok(None);
|
return Ok(AppServerTryRecvResult::None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE when using mio::Poll, there are some particularities (taken from
|
// NOTE when using mio::Poll, there are some particularities (taken from
|
||||||
@@ -1459,12 +1468,19 @@ impl AppServer {
|
|||||||
// blocking poll, we go through all available IO sources to see if we missed anything.
|
// blocking poll, we go through all available IO sources to see if we missed anything.
|
||||||
{
|
{
|
||||||
while let Some(ev) = self.short_poll_queue.pop_front() {
|
while let Some(ev) = self.short_poll_queue.pop_front() {
|
||||||
if let Some(v) = self.try_recv_from_mio_token(buf, ev.token())? {
|
match self.try_recv_from_mio_token(buf, ev.token())? {
|
||||||
return Ok(Some(v));
|
AppServerTryRecvResult::None => continue,
|
||||||
|
res => return Ok(res),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Drain operating system signals
|
||||||
|
match self.try_recv_from_signal_handler()? {
|
||||||
|
AppServerTryRecvResult::None => {} // Nop
|
||||||
|
res => return Ok(res),
|
||||||
|
}
|
||||||
|
|
||||||
// drain all sockets
|
// drain all sockets
|
||||||
let mut would_block_count = 0;
|
let mut would_block_count = 0;
|
||||||
for sock_no in 0..self.sockets.len() {
|
for sock_no in 0..self.sockets.len() {
|
||||||
@@ -1472,11 +1488,11 @@ impl AppServer {
|
|||||||
.try_recv_from_listen_socket(buf, sock_no)
|
.try_recv_from_listen_socket(buf, sock_no)
|
||||||
.io_err_kind_hint()
|
.io_err_kind_hint()
|
||||||
{
|
{
|
||||||
Ok(None) => continue,
|
Ok(AppServerTryRecvResult::None) => continue,
|
||||||
Ok(Some(v)) => {
|
Ok(res) => {
|
||||||
// at least one socket was not drained...
|
// at least one socket was not drained...
|
||||||
self.all_sockets_drained = false;
|
self.all_sockets_drained = false;
|
||||||
return Ok(Some(v));
|
return Ok(res);
|
||||||
}
|
}
|
||||||
Err((_, ErrorKind::WouldBlock)) => {
|
Err((_, ErrorKind::WouldBlock)) => {
|
||||||
would_block_count += 1;
|
would_block_count += 1;
|
||||||
@@ -1504,12 +1520,24 @@ impl AppServer {
|
|||||||
|
|
||||||
self.performed_long_poll = true;
|
self.performed_long_poll = true;
|
||||||
|
|
||||||
Ok(None)
|
Ok(AppServerTryRecvResult::None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal helper for [Self::try_recv]
|
/// Internal helper for [Self::try_recv]
|
||||||
fn perform_mio_poll_and_register_events(&mut self, timeout: Duration) -> io::Result<()> {
|
fn perform_mio_poll_and_register_events(&mut self, timeout: Duration) -> io::Result<()> {
|
||||||
self.mio_poll.poll(&mut self.events, Some(timeout))?;
|
loop {
|
||||||
|
use std::io::ErrorKind as IOE;
|
||||||
|
match self
|
||||||
|
.mio_poll
|
||||||
|
.poll(&mut self.events, Some(timeout))
|
||||||
|
.io_err_kind_hint()
|
||||||
|
{
|
||||||
|
Ok(()) => break,
|
||||||
|
Err((_, IOE::Interrupted)) => continue,
|
||||||
|
Err((err, _)) => return Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Fill the short poll buffer with the acquired events
|
// Fill the short poll buffer with the acquired events
|
||||||
self.events
|
self.events
|
||||||
.iter()
|
.iter()
|
||||||
@@ -1523,12 +1551,12 @@ impl AppServer {
|
|||||||
&mut self,
|
&mut self,
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
token: mio::Token,
|
token: mio::Token,
|
||||||
) -> anyhow::Result<Option<(usize, Endpoint)>> {
|
) -> anyhow::Result<AppServerTryRecvResult> {
|
||||||
let io_source = match self.io_source_index.get(&token) {
|
let io_source = match self.io_source_index.get(&token) {
|
||||||
Some(io_source) => *io_source,
|
Some(io_source) => *io_source,
|
||||||
None => {
|
None => {
|
||||||
log::warn!("No IO source assiociated with mio token ({token:?}). Polling using mio tokens directly is an experimental feature and IO handler should recover when all available io sources are polled. This is a developer error. Please report it.");
|
log::warn!("No IO source assiociated with mio token ({token:?}). Polling using mio tokens directly is an experimental feature and IO handler should recover when all available io sources are polled. This is a developer error. Please report it.");
|
||||||
return Ok(None);
|
return Ok(AppServerTryRecvResult::None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1540,11 +1568,13 @@ impl AppServer {
|
|||||||
&mut self,
|
&mut self,
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
io_source: AppServerIoSource,
|
io_source: AppServerIoSource,
|
||||||
) -> anyhow::Result<Option<(usize, Endpoint)>> {
|
) -> anyhow::Result<AppServerTryRecvResult> {
|
||||||
match io_source {
|
match io_source {
|
||||||
|
AppServerIoSource::SignalHandler => self.try_recv_from_signal_handler()?.ok(),
|
||||||
|
|
||||||
AppServerIoSource::Socket(idx) => self
|
AppServerIoSource::Socket(idx) => self
|
||||||
.try_recv_from_listen_socket(buf, idx)
|
.try_recv_from_listen_socket(buf, idx)
|
||||||
.substitute_for_ioerr_wouldblock(None)?
|
.substitute_for_ioerr_wouldblock(AppServerTryRecvResult::None)?
|
||||||
.ok(),
|
.ok(),
|
||||||
|
|
||||||
AppServerIoSource::PskBroker(key) => self
|
AppServerIoSource::PskBroker(key) => self
|
||||||
@@ -1553,7 +1583,7 @@ impl AppServer {
|
|||||||
.get_mut(&key)
|
.get_mut(&key)
|
||||||
.with_context(|| format!("No PSK broker under key {key:?}"))?
|
.with_context(|| format!("No PSK broker under key {key:?}"))?
|
||||||
.process_poll()
|
.process_poll()
|
||||||
.map(|_| None),
|
.map(|_| AppServerTryRecvResult::None),
|
||||||
|
|
||||||
#[cfg(feature = "experiment_api")]
|
#[cfg(feature = "experiment_api")]
|
||||||
AppServerIoSource::MioManager(mmio_src) => {
|
AppServerIoSource::MioManager(mmio_src) => {
|
||||||
@@ -1561,17 +1591,28 @@ impl AppServer {
|
|||||||
|
|
||||||
MioManagerFocus(self)
|
MioManagerFocus(self)
|
||||||
.poll_particular(mmio_src)
|
.poll_particular(mmio_src)
|
||||||
.map(|_| None)
|
.map(|_| AppServerTryRecvResult::None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Internal helper for [Self::try_recv]
|
||||||
|
fn try_recv_from_signal_handler(&mut self) -> io::Result<AppServerTryRecvResult> {
|
||||||
|
#[allow(clippy::never_loop)]
|
||||||
|
for signal in self.signal_handler.pending() {
|
||||||
|
log::debug!("Received operating system signal no {signal}.");
|
||||||
|
log::info!("Received termination request; exiting.");
|
||||||
|
return Ok(AppServerTryRecvResult::Terminate);
|
||||||
|
}
|
||||||
|
Ok(AppServerTryRecvResult::None)
|
||||||
|
}
|
||||||
|
|
||||||
/// Internal helper for [Self::try_recv]
|
/// Internal helper for [Self::try_recv]
|
||||||
fn try_recv_from_listen_socket(
|
fn try_recv_from_listen_socket(
|
||||||
&mut self,
|
&mut self,
|
||||||
buf: &mut [u8],
|
buf: &mut [u8],
|
||||||
idx: usize,
|
idx: usize,
|
||||||
) -> io::Result<Option<(usize, Endpoint)>> {
|
) -> io::Result<AppServerTryRecvResult> {
|
||||||
use std::io::ErrorKind as K;
|
use std::io::ErrorKind as K;
|
||||||
let (n, addr) = loop {
|
let (n, addr) = loop {
|
||||||
match self.sockets[idx].recv_from(buf).io_err_kind_hint() {
|
match self.sockets[idx].recv_from(buf).io_err_kind_hint() {
|
||||||
@@ -1583,8 +1624,7 @@ impl AppServer {
|
|||||||
SocketPtr(idx)
|
SocketPtr(idx)
|
||||||
.apply(|sp| SocketBoundEndpoint::new(sp, addr))
|
.apply(|sp| SocketBoundEndpoint::new(sp, addr))
|
||||||
.apply(Endpoint::SocketBoundAddress)
|
.apply(Endpoint::SocketBoundAddress)
|
||||||
.apply(|ep| (n, ep))
|
.apply(|ep| AppServerTryRecvResult::NetworkMessage(n, ep))
|
||||||
.some()
|
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1636,48 +1676,3 @@ impl crate::api::mio::MioManagerContext for MioManagerFocus<'_> {
|
|||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// These signal handlers are used exclusively used during coverage testing
|
|
||||||
/// to ensure that the llvm-cov can produce reports during integration tests
|
|
||||||
/// with multiple processes where subprocesses are terminated via kill(2).
|
|
||||||
///
|
|
||||||
/// llvm-cov does not support producing coverage reports when the process exits
|
|
||||||
/// through a signal, so this is necessary.
|
|
||||||
///
|
|
||||||
/// The functionality of exiting gracefully upon reception of a terminating signal
|
|
||||||
/// is desired for the production variant of Rosenpass, but we should make sure
|
|
||||||
/// to use a higher quality implementation; in particular, we should use signalfd(2).
|
|
||||||
///
|
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
mod terminate {
|
|
||||||
use signal_hook::flag::register as sig_register;
|
|
||||||
use std::sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Automatically register a signal handler for common termination signals;
|
|
||||||
/// whether one of these signals was issued can be polled using [Self::value].
|
|
||||||
///
|
|
||||||
/// The signal handler is not removed when this struct goes out of scope.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct TerminateRequested {
|
|
||||||
value: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TerminateRequested {
|
|
||||||
/// Register signal handlers watching for common termination signals
|
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
|
||||||
let value = Arc::new(AtomicBool::new(false));
|
|
||||||
for sig in signal_hook::consts::TERM_SIGNALS.iter().copied() {
|
|
||||||
sig_register(sig, Arc::clone(&value))?;
|
|
||||||
}
|
|
||||||
Ok(Self { value })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check whether a termination signal has been set
|
|
||||||
pub fn value(&self) -> bool {
|
|
||||||
self.value.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -630,7 +630,11 @@ version = "3.2.0"
|
|||||||
criteria = "safe-to-run"
|
criteria = "safe-to-run"
|
||||||
|
|
||||||
[[exemptions.signal-hook]]
|
[[exemptions.signal-hook]]
|
||||||
version = "0.3.17"
|
version = "0.3.18"
|
||||||
|
criteria = "safe-to-deploy"
|
||||||
|
|
||||||
|
[[exemptions.signal-hook-mio]]
|
||||||
|
version = "0.2.4"
|
||||||
criteria = "safe-to-deploy"
|
criteria = "safe-to-deploy"
|
||||||
|
|
||||||
[[exemptions.signal-hook-registry]]
|
[[exemptions.signal-hook-registry]]
|
||||||
|
|||||||
82
util/src/fmt/debug.rs
Normal file
82
util/src/fmt/debug.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
//! Helpers for string formatting with the debug formatter; extensions for [std::fmt::Debug]
|
||||||
|
|
||||||
|
use std::any::type_name;
|
||||||
|
use std::borrow::{Borrow, BorrowMut};
|
||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
/// Debug formatter which just prints the type name;
|
||||||
|
/// used to wrap values which do not support the Debug
|
||||||
|
/// trait themselves
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use rosenpass_util::fmt::debug::NullDebug;
|
||||||
|
///
|
||||||
|
/// // Does not implement debug
|
||||||
|
/// struct NoDebug;
|
||||||
|
///
|
||||||
|
/// #[derive(Debug)]
|
||||||
|
/// struct ShouldSupportDebug {
|
||||||
|
/// #[allow(dead_code)]
|
||||||
|
/// no_debug: NullDebug<NoDebug>,
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// let val = ShouldSupportDebug {
|
||||||
|
/// no_debug: NullDebug(NoDebug),
|
||||||
|
/// };
|
||||||
|
/// ```
|
||||||
|
pub struct NullDebug<T>(pub T);
|
||||||
|
|
||||||
|
impl<T> std::fmt::Debug for NullDebug<T> {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.write_str("NullDebug<")?;
|
||||||
|
f.write_str(type_name::<T>())?;
|
||||||
|
f.write_str(">")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<T> for NullDebug<T> {
|
||||||
|
fn from(value: T) -> Self {
|
||||||
|
NullDebug(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for NullDebug<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.0.borrow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for NullDebug<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.0.borrow_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Borrow<T> for NullDebug<T> {
|
||||||
|
fn borrow(&self) -> &T {
|
||||||
|
self.deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> BorrowMut<T> for NullDebug<T> {
|
||||||
|
fn borrow_mut(&mut self) -> &mut T {
|
||||||
|
self.deref_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> AsRef<T> for NullDebug<T> {
|
||||||
|
fn as_ref(&self) -> &T {
|
||||||
|
self.deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> AsMut<T> for NullDebug<T> {
|
||||||
|
fn as_mut(&mut self) -> &mut T {
|
||||||
|
self.deref_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
3
util/src/fmt/mod.rs
Normal file
3
util/src/fmt/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
//! Helpers for string formatting; extensions for [std::fmt]
|
||||||
|
|
||||||
|
pub mod debug;
|
||||||
@@ -14,6 +14,7 @@ pub mod controlflow;
|
|||||||
pub mod fd;
|
pub mod fd;
|
||||||
/// File system operations and handling.
|
/// File system operations and handling.
|
||||||
pub mod file;
|
pub mod file;
|
||||||
|
pub mod fmt;
|
||||||
/// Functional programming utilities.
|
/// Functional programming utilities.
|
||||||
pub mod functional;
|
pub mod functional;
|
||||||
/// Input/output operations.
|
/// Input/output operations.
|
||||||
|
|||||||
Reference in New Issue
Block a user