From 3c0e16734704e4a5048a8487dc707f0b7e0b3ce0 Mon Sep 17 00:00:00 2001 From: Karolin Varner Date: Sun, 3 Aug 2025 14:49:38 +0200 Subject: [PATCH] fix(rosenpass): Integrate signal handlers with mio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this commit, rosenpass uses a signal handler based on the signal-hook-mio crate. Even though, in this commit, no rosenpass-rp code is touched, this also fixes the signal handling in rosenpass-rp. The way rosenpass is integrated in rp is a bit of a hack – it just directly embeds rosenpass in the same process (though on a dedicated thread). For this reason, rp now just inherits rosenpass' signal handlers. The rosenpass event_loop() will terminate. The main loop of `rp` just spends most of the time waiting for rosenpass itself to finish, and exits when it finishes. Unfortunately, this means we are not using signalfd(2)[^0]; the signal-hook-mio crate appears to use a pipe-based mechanism to deliver events to mio instead. This may not be such a bad thing, as signalfd has some severe drawbacks with respect to subprocesses and masked signals[^1]. Fixes: #358 (https://github.com/rosenpass/rosenpass/issues/385) Fixes: #522 (https://github.com/rosenpass/rosenpass/issues/522) Fixes: #678 (https://github.com/rosenpass/rosenpass/pull/678) [^0]: https://unixism.net/2021/02/making-signals-less-painful-under-linux/ [^1]: https://ldpreload.com/blog/signalfd-is-useless?reposted-on-request --- Cargo.lock | 16 ++- Cargo.toml | 3 +- rosenpass/Cargo.toml | 4 +- rosenpass/src/app_server.rs | 193 ++++++++++++++++++------------------ supply-chain/config.toml | 6 +- util/src/fmt/debug.rs | 82 +++++++++++++++ util/src/fmt/mod.rs | 3 + util/src/lib.rs | 1 + 8 files changed, 203 insertions(+), 105 deletions(-) create mode 100644 util/src/fmt/debug.rs create mode 100644 util/src/fmt/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e029556..44e4884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2070,6 +2070,7 @@ dependencies = [ "serde", "serial_test", "signal-hook", + "signal-hook-mio", "stacker", "static_assertions", "tempfile", @@ -2437,14 +2438,25 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" dependencies = [ "libc", "signal-hook-registry", ] +[[package]] +name = "signal-hook-mio" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" diff --git a/Cargo.toml b/Cargo.toml index 9b94447..8e9b19c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,8 @@ serde = { version = "1.0.217", features = ["derive"] } arbitrary = { version = "1.4.1", features = ["derive"] } anyhow = { version = "1.0.95", features = ["backtrace", "std"] } mio = { version = "1.0.3", features = ["net", "os-poll"] } +signal-hook-mio = { version = "0.2.4", features = ["support-v1_0"] } +signal-hook = "0.3.17" oqs-sys = { version = "0.9.1", default-features = false, features = [ 'classic_mceliece', 'kyber', @@ -79,7 +81,6 @@ hex = { version = "0.4.3" } heck = { version = "0.5.0" } libc = { version = "0.2" } uds = { git = "https://github.com/rosenpass/uds" } -signal-hook = "0.3.17" lazy_static = "1.5" #Dev dependencies diff --git a/rosenpass/Cargo.toml b/rosenpass/Cargo.toml index c23d1f3..9ce2686 100644 --- a/rosenpass/Cargo.toml +++ b/rosenpass/Cargo.toml @@ -64,6 +64,8 @@ clap = { workspace = true } clap_complete = { workspace = true } clap_mangen = { workspace = true } mio = { workspace = true } +signal-hook = { workspace = true } +signal-hook-mio = { workspace = true } rand = { workspace = true } zerocopy = { workspace = true } home = { workspace = true } @@ -76,7 +78,6 @@ heck = { workspace = true, optional = true } command-fds = { workspace = true, optional = true } rustix = { workspace = true, optional = true } uds = { workspace = true, optional = true, features = ["mio_1xx"] } -signal-hook = { workspace = true, optional = true } libcrux-test-utils = { workspace = true, optional = true } [build-dependencies] @@ -109,7 +110,6 @@ experiment_api = [ "rosenpass-util/experiment_file_descriptor_passing", "rosenpass-wireguard-broker/experiment_api", ] -internal_signal_handling_for_coverage_reports = ["signal-hook"] internal_testing = [] internal_bin_gen_ipc_msg_types = ["hex", "heck"] trace_bench = ["rosenpass-util/trace_bench", "dep:libcrux-test-utils"] diff --git a/rosenpass/src/app_server.rs b/rosenpass/src/app_server.rs index 59f3192..eeff163 100644 --- a/rosenpass/src/app_server.rs +++ b/rosenpass/src/app_server.rs @@ -7,17 +7,20 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSoc use std::time::{Duration, Instant}; use std::{cell::Cell, fmt::Debug, io, path::PathBuf, slice}; +use mio::{Interest, Token}; +use signal_hook_mio::v1_0 as signal_hook_mio; + use anyhow::{bail, Context, Result}; use derive_builder::Builder; use log::{error, info, warn}; -use mio::{Interest, Token}; use zerocopy::AsBytes; use rosenpass_util::attempt; +use rosenpass_util::fmt::debug::NullDebug; use rosenpass_util::functional::{run, ApplyExt}; use rosenpass_util::io::{IoResultKindHintExt, SubstituteForIoErrorKindExt}; use rosenpass_util::{ - b64::B64Display, build::ConstructionSite, file::StoreValueB64, option::SomeExt, result::OkExt, + b64::B64Display, build::ConstructionSite, file::StoreValueB64, result::OkExt, }; use rosenpass_secret_memory::{Public, Secret}; @@ -286,12 +289,20 @@ pub enum AppServerIoSource { Socket(usize), /// IO source refers to a PSK broker in [AppServer::brokers] PskBroker(Public), + /// IO source refers to our signal handlers + SignalHandler, /// IO source refers to some IO sources used in the API; /// see [AppServer::api_manager] #[cfg(feature = "experiment_api")] MioManager(crate::api::mio::MioManagerIoSource), } +pub enum AppServerTryRecvResult { + None, + Terminate, + NetworkMessage(usize, Endpoint), +} + /// Number of epoll(7) events Rosenpass can receive at a time const EVENT_CAPACITY: usize = 20; @@ -332,6 +343,8 @@ pub struct AppServer { /// MIO associates IO sources with numeric tokens. This struct takes care of generating these /// tokens pub mio_token_dispenser: MioTokenDispenser, + /// Mio-based handler for signals + pub signal_handler: NullDebug, /// Helpers handling communication with WireGuard; these take a generated key and forward it to /// WireGuard pub brokers: BrokerStore, @@ -357,16 +370,6 @@ pub struct AppServer { /// Used by integration tests to force [Self] into DoS condition /// and to terminate the AppServer after the test is complete pub test_helpers: Option, - /// Helper for integration tests running rosenpass as a subprocess - /// to terminate properly upon receiving an appropriate system signal. - /// - /// This is primarily needed for coverage testing, since llvm-cov does not - /// write coverage reports to disk when a process is stopped by the default - /// signal handler. - /// - /// See - #[cfg(feature = "internal_signal_handling_for_coverage_reports")] - pub term_signal: terminate::TerminateRequested, #[cfg(feature = "experiment_api")] /// The Rosenpass unix socket API handler; this is an experimental /// feature that can be used to embed Rosenpass in external applications @@ -456,6 +459,8 @@ impl AppPeerPtr { /// Instructs [AppServer::event_loop_without_error_handling] on how to proceed. #[derive(Debug)] pub enum AppPollResult { + /// Received request to terminate the application + Terminate, /// Erase the key for a given peer. Corresponds to [crate::protocol::PollResult::DeleteKey] DeleteKey(AppPeerPtr), /// Send an initiation to the given peer. Corresponds to [crate::protocol::PollResult::SendInitiation] @@ -802,10 +807,27 @@ impl AppServer { verbosity: Verbosity, test_helpers: Option, ) -> anyhow::Result { - // setup mio + // Setup Mio itself let mio_poll = mio::Poll::new()?; let events = mio::Events::with_capacity(EVENT_CAPACITY); + + // And helpers to map mio tokens to internal event types let mut mio_token_dispenser = MioTokenDispenser::default(); + let mut io_source_index = HashMap::new(); + + // Setup signal handling + let signal_handler = attempt!({ + let mut handler = + signal_hook_mio::Signals::new(signal_hook::consts::TERM_SIGNALS.iter())?; + let mio_token = mio_token_dispenser.dispense(); + mio_poll + .registry() + .register(&mut handler, mio_token, Interest::READABLE)?; + let prev = io_source_index.insert(mio_token, AppServerIoSource::SignalHandler); + assert!(prev.is_none()); + Ok(NullDebug(handler)) + }) + .context("Failed to set up signal (user triggered program termination) handler")?; // bind each SocketAddr to a socket let maybe_sockets: Result, _> = @@ -879,7 +901,6 @@ impl AppServer { } // register all sockets to mio - let mut io_source_index = HashMap::new(); for (idx, socket) in sockets.iter_mut().enumerate() { let mio_token = mio_token_dispenser.dispense(); mio_poll @@ -895,8 +916,6 @@ impl AppServer { }; Ok(Self { - #[cfg(feature = "internal_signal_handling_for_coverage_reports")] - term_signal: terminate::TerminateRequested::new()?, crypto_site, peers: Vec::new(), verbosity, @@ -907,6 +926,7 @@ impl AppServer { io_source_index, mio_poll, mio_token_dispenser, + signal_handler, brokers: BrokerStore::default(), all_sockets_drained: false, under_load: DoSOperation::Normal, @@ -1049,7 +1069,7 @@ impl AppServer { Ok(AppPeerPtr(pn)) } - /// Main IO handler; this generally does not terminate + /// Main IO handler; this generally does not terminate other than through unix signals /// /// # Examples /// @@ -1066,23 +1086,6 @@ impl AppServer { Err(e) => e, }; - #[cfg(feature = "internal_signal_handling_for_coverage_reports")] - { - let terminated_by_signal = err - .downcast_ref::() - .filter(|e| e.kind() == std::io::ErrorKind::Interrupted) - .filter(|_| self.term_signal.value()) - .is_some(); - if terminated_by_signal { - log::warn!( - "\ - Terminated by signal; this signal handler is correct during coverage testing \ - but should be otherwise disabled" - ); - return Ok(()); - } - } - // This should not happen… failure_cnt = if msgs_processed > 0 { 0 @@ -1135,6 +1138,7 @@ impl AppServer { use AppPollResult::*; use KeyOutputReason::*; + // TODO: We should read from this using a mio channel if let Some(AppServerTest { termination_handler: Some(terminate), .. @@ -1158,6 +1162,8 @@ impl AppServer { #[allow(clippy::redundant_closure_call)] match (have_crypto, poll_result) { + (_, Terminate) => return Ok(()), + (CryptoSrv::Missing, SendInitiation(_)) => {} (CryptoSrv::Avail, SendInitiation(peer)) => tx_maybe_with!(peer, || self .crypto_server_mut()? @@ -1305,6 +1311,7 @@ impl AppServer { pub fn poll(&mut self, rx_buf: &mut [u8]) -> anyhow::Result { use crate::protocol::PollResult as C; use AppPollResult as A; + use AppServerTryRecvResult as R; let res = loop { // Call CryptoServer's poll (if available) let crypto_poll = self @@ -1325,8 +1332,10 @@ impl AppServer { }; // Perform IO (look for a message) - if let Some((len, addr)) = self.try_recv(rx_buf, io_poll_timeout)? { - break A::ReceivedMessage(len, addr); + match self.try_recv(rx_buf, io_poll_timeout)? { + R::None => {} + R::Terminate => break A::Terminate, + R::NetworkMessage(len, addr) => break A::ReceivedMessage(len, addr), } }; @@ -1344,12 +1353,12 @@ impl AppServer { &mut self, buf: &mut [u8], timeout: Timing, - ) -> anyhow::Result> { + ) -> anyhow::Result { let timeout = Duration::from_secs_f64(timeout); // if there is no time to wait on IO, well, then, lets not waste any time! if timeout.is_zero() { - return Ok(None); + return Ok(AppServerTryRecvResult::None); } // NOTE when using mio::Poll, there are some particularities (taken from @@ -1459,12 +1468,19 @@ impl AppServer { // blocking poll, we go through all available IO sources to see if we missed anything. { while let Some(ev) = self.short_poll_queue.pop_front() { - if let Some(v) = self.try_recv_from_mio_token(buf, ev.token())? { - return Ok(Some(v)); + match self.try_recv_from_mio_token(buf, ev.token())? { + AppServerTryRecvResult::None => continue, + res => return Ok(res), } } } + // Drain operating system signals + match self.try_recv_from_signal_handler()? { + AppServerTryRecvResult::None => {} // Nop + res => return Ok(res), + } + // drain all sockets let mut would_block_count = 0; for sock_no in 0..self.sockets.len() { @@ -1472,11 +1488,11 @@ impl AppServer { .try_recv_from_listen_socket(buf, sock_no) .io_err_kind_hint() { - Ok(None) => continue, - Ok(Some(v)) => { + Ok(AppServerTryRecvResult::None) => continue, + Ok(res) => { // at least one socket was not drained... self.all_sockets_drained = false; - return Ok(Some(v)); + return Ok(res); } Err((_, ErrorKind::WouldBlock)) => { would_block_count += 1; @@ -1504,12 +1520,24 @@ impl AppServer { self.performed_long_poll = true; - Ok(None) + Ok(AppServerTryRecvResult::None) } /// Internal helper for [Self::try_recv] fn perform_mio_poll_and_register_events(&mut self, timeout: Duration) -> io::Result<()> { - self.mio_poll.poll(&mut self.events, Some(timeout))?; + loop { + use std::io::ErrorKind as IOE; + match self + .mio_poll + .poll(&mut self.events, Some(timeout)) + .io_err_kind_hint() + { + Ok(()) => break, + Err((_, IOE::Interrupted)) => continue, + Err((err, _)) => return Err(err), + } + } + // Fill the short poll buffer with the acquired events self.events .iter() @@ -1523,12 +1551,12 @@ impl AppServer { &mut self, buf: &mut [u8], token: mio::Token, - ) -> anyhow::Result> { + ) -> anyhow::Result { let io_source = match self.io_source_index.get(&token) { Some(io_source) => *io_source, None => { log::warn!("No IO source assiociated with mio token ({token:?}). Polling using mio tokens directly is an experimental feature and IO handler should recover when all available io sources are polled. This is a developer error. Please report it."); - return Ok(None); + return Ok(AppServerTryRecvResult::None); } }; @@ -1540,11 +1568,13 @@ impl AppServer { &mut self, buf: &mut [u8], io_source: AppServerIoSource, - ) -> anyhow::Result> { + ) -> anyhow::Result { match io_source { + AppServerIoSource::SignalHandler => self.try_recv_from_signal_handler()?.ok(), + AppServerIoSource::Socket(idx) => self .try_recv_from_listen_socket(buf, idx) - .substitute_for_ioerr_wouldblock(None)? + .substitute_for_ioerr_wouldblock(AppServerTryRecvResult::None)? .ok(), AppServerIoSource::PskBroker(key) => self @@ -1553,7 +1583,7 @@ impl AppServer { .get_mut(&key) .with_context(|| format!("No PSK broker under key {key:?}"))? .process_poll() - .map(|_| None), + .map(|_| AppServerTryRecvResult::None), #[cfg(feature = "experiment_api")] AppServerIoSource::MioManager(mmio_src) => { @@ -1561,17 +1591,28 @@ impl AppServer { MioManagerFocus(self) .poll_particular(mmio_src) - .map(|_| None) + .map(|_| AppServerTryRecvResult::None) } } } + /// Internal helper for [Self::try_recv] + fn try_recv_from_signal_handler(&mut self) -> io::Result { + #[allow(clippy::never_loop)] + for signal in self.signal_handler.pending() { + log::debug!("Received operating system signal no {signal}."); + log::info!("Received termination request; exiting."); + return Ok(AppServerTryRecvResult::Terminate); + } + Ok(AppServerTryRecvResult::None) + } + /// Internal helper for [Self::try_recv] fn try_recv_from_listen_socket( &mut self, buf: &mut [u8], idx: usize, - ) -> io::Result> { + ) -> io::Result { use std::io::ErrorKind as K; let (n, addr) = loop { match self.sockets[idx].recv_from(buf).io_err_kind_hint() { @@ -1583,8 +1624,7 @@ impl AppServer { SocketPtr(idx) .apply(|sp| SocketBoundEndpoint::new(sp, addr)) .apply(Endpoint::SocketBoundAddress) - .apply(|ep| (n, ep)) - .some() + .apply(|ep| AppServerTryRecvResult::NetworkMessage(n, ep)) .ok() } @@ -1636,48 +1676,3 @@ impl crate::api::mio::MioManagerContext for MioManagerFocus<'_> { self.0 } } - -/// These signal handlers are used exclusively used during coverage testing -/// to ensure that the llvm-cov can produce reports during integration tests -/// with multiple processes where subprocesses are terminated via kill(2). -/// -/// llvm-cov does not support producing coverage reports when the process exits -/// through a signal, so this is necessary. -/// -/// The functionality of exiting gracefully upon reception of a terminating signal -/// is desired for the production variant of Rosenpass, but we should make sure -/// to use a higher quality implementation; in particular, we should use signalfd(2). -/// -#[cfg(feature = "internal_signal_handling_for_coverage_reports")] -mod terminate { - use signal_hook::flag::register as sig_register; - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }; - - /// Automatically register a signal handler for common termination signals; - /// whether one of these signals was issued can be polled using [Self::value]. - /// - /// The signal handler is not removed when this struct goes out of scope. - #[derive(Debug)] - pub struct TerminateRequested { - value: Arc, - } - - impl TerminateRequested { - /// Register signal handlers watching for common termination signals - pub fn new() -> anyhow::Result { - let value = Arc::new(AtomicBool::new(false)); - for sig in signal_hook::consts::TERM_SIGNALS.iter().copied() { - sig_register(sig, Arc::clone(&value))?; - } - Ok(Self { value }) - } - - /// Check whether a termination signal has been set - pub fn value(&self) -> bool { - self.value.load(Ordering::Relaxed) - } - } -} diff --git a/supply-chain/config.toml b/supply-chain/config.toml index a261f9a..616f97d 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -630,7 +630,11 @@ version = "3.2.0" criteria = "safe-to-run" [[exemptions.signal-hook]] -version = "0.3.17" +version = "0.3.18" +criteria = "safe-to-deploy" + +[[exemptions.signal-hook-mio]] +version = "0.2.4" criteria = "safe-to-deploy" [[exemptions.signal-hook-registry]] diff --git a/util/src/fmt/debug.rs b/util/src/fmt/debug.rs new file mode 100644 index 0000000..839b159 --- /dev/null +++ b/util/src/fmt/debug.rs @@ -0,0 +1,82 @@ +//! Helpers for string formatting with the debug formatter; extensions for [std::fmt::Debug] + +use std::any::type_name; +use std::borrow::{Borrow, BorrowMut}; +use std::ops::{Deref, DerefMut}; + +/// Debug formatter which just prints the type name; +/// used to wrap values which do not support the Debug +/// trait themselves +/// +/// # Examples +/// +/// ```rust +/// use rosenpass_util::fmt::debug::NullDebug; +/// +/// // Does not implement debug +/// struct NoDebug; +/// +/// #[derive(Debug)] +/// struct ShouldSupportDebug { +/// #[allow(dead_code)] +/// no_debug: NullDebug, +/// } +/// +/// let val = ShouldSupportDebug { +/// no_debug: NullDebug(NoDebug), +/// }; +/// ``` +pub struct NullDebug(pub T); + +impl std::fmt::Debug for NullDebug { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("NullDebug<")?; + f.write_str(type_name::())?; + f.write_str(">")?; + Ok(()) + } +} + +impl From for NullDebug { + fn from(value: T) -> Self { + NullDebug(value) + } +} + +impl Deref for NullDebug { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.borrow() + } +} + +impl DerefMut for NullDebug { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.borrow_mut() + } +} + +impl Borrow for NullDebug { + fn borrow(&self) -> &T { + self.deref() + } +} + +impl BorrowMut for NullDebug { + fn borrow_mut(&mut self) -> &mut T { + self.deref_mut() + } +} + +impl AsRef for NullDebug { + fn as_ref(&self) -> &T { + self.deref() + } +} + +impl AsMut for NullDebug { + fn as_mut(&mut self) -> &mut T { + self.deref_mut() + } +} diff --git a/util/src/fmt/mod.rs b/util/src/fmt/mod.rs new file mode 100644 index 0000000..0d35b06 --- /dev/null +++ b/util/src/fmt/mod.rs @@ -0,0 +1,3 @@ +//! Helpers for string formatting; extensions for [std::fmt] + +pub mod debug; diff --git a/util/src/lib.rs b/util/src/lib.rs index af70d47..69d7698 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -14,6 +14,7 @@ pub mod controlflow; pub mod fd; /// File system operations and handling. pub mod file; +pub mod fmt; /// Functional programming utilities. pub mod functional; /// Input/output operations.