mirror of
https://github.com/rosenpass/rosenpass.git
synced 2026-02-27 22:13:12 -08:00
@@ -167,6 +167,8 @@ const EVENT_CAPACITY: usize = 20;
|
|||||||
// TODO add user control via unix domain socket and stdin/stdout
|
// TODO add user control via unix domain socket and stdin/stdout
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AppServer {
|
pub struct AppServer {
|
||||||
|
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
||||||
|
pub term_signal: terminate::TerminateRequested,
|
||||||
pub crypto_site: ConstructionSite<BuildCryptoServer, CryptoServer>,
|
pub crypto_site: ConstructionSite<BuildCryptoServer, CryptoServer>,
|
||||||
pub sockets: Vec<mio::net::UdpSocket>,
|
pub sockets: Vec<mio::net::UdpSocket>,
|
||||||
pub events: mio::Events,
|
pub events: mio::Events,
|
||||||
@@ -633,6 +635,8 @@ impl AppServer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
||||||
|
term_signal: terminate::TerminateRequested::new()?,
|
||||||
crypto_site,
|
crypto_site,
|
||||||
peers: Vec::new(),
|
peers: Vec::new(),
|
||||||
verbosity,
|
verbosity,
|
||||||
@@ -754,18 +758,35 @@ impl AppServer {
|
|||||||
Ok(AppPeerPtr(pn))
|
Ok(AppPeerPtr(pn))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn listen_loop(&mut self) -> anyhow::Result<()> {
|
pub fn event_loop(&mut self) -> anyhow::Result<()> {
|
||||||
const INIT_SLEEP: f64 = 0.01;
|
const INIT_SLEEP: f64 = 0.01;
|
||||||
const MAX_FAILURES: i32 = 10;
|
const MAX_FAILURES: i32 = 10;
|
||||||
let mut failure_cnt = 0;
|
let mut failure_cnt = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let msgs_processed = 0usize;
|
let msgs_processed = 0usize;
|
||||||
let err = match self.event_loop() {
|
let err = match self.event_loop_without_error_handling() {
|
||||||
Ok(()) => return Ok(()),
|
Ok(()) => return Ok(()),
|
||||||
Err(e) => e,
|
Err(e) => e,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
||||||
|
{
|
||||||
|
let terminated_by_signal = err
|
||||||
|
.downcast_ref::<std::io::Error>()
|
||||||
|
.filter(|e| e.kind() == std::io::ErrorKind::Interrupted)
|
||||||
|
.filter(|_| self.term_signal.value())
|
||||||
|
.is_some();
|
||||||
|
if terminated_by_signal {
|
||||||
|
log::warn!(
|
||||||
|
"\
|
||||||
|
Terminated by signal; this signal handler is correct during coverage testing \
|
||||||
|
but should be otherwise disabled"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// This should not happen…
|
// This should not happen…
|
||||||
failure_cnt = if msgs_processed > 0 {
|
failure_cnt = if msgs_processed > 0 {
|
||||||
0
|
0
|
||||||
@@ -790,7 +811,7 @@ impl AppServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn event_loop(&mut self) -> anyhow::Result<()> {
|
pub fn event_loop_without_error_handling(&mut self) -> anyhow::Result<()> {
|
||||||
let (mut rx, mut tx) = (MsgBuf::zero(), MsgBuf::zero());
|
let (mut rx, mut tx) = (MsgBuf::zero(), MsgBuf::zero());
|
||||||
|
|
||||||
/// if socket address for peer is known, call closure
|
/// if socket address for peer is known, call closure
|
||||||
@@ -1288,3 +1309,48 @@ impl crate::api::mio::MioManagerContext for MioManagerFocus<'_> {
|
|||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// These signal handlers are used exclusively used during coverage testing
|
||||||
|
/// to ensure that the llvm-cov can produce reports during integration tests
|
||||||
|
/// with multiple processes where subprocesses are terminated via kill(2).
|
||||||
|
///
|
||||||
|
/// llvm-cov does not support producing coverage reports when the process exits
|
||||||
|
/// through a signal, so this is necessary.
|
||||||
|
///
|
||||||
|
/// The functionality of exiting gracefully upon reception of a terminating signal
|
||||||
|
/// is desired for the production variant of Rosenpass, but we should make sure
|
||||||
|
/// to use a higher quality implementation; in particular, we should use signalfd(2).
|
||||||
|
///
|
||||||
|
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
||||||
|
mod terminate {
|
||||||
|
use signal_hook::flag::register as sig_register;
|
||||||
|
use std::sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Automatically register a signal handler for common termination signals;
|
||||||
|
/// whether one of these signals was issued can be polled using [Self::value].
|
||||||
|
///
|
||||||
|
/// The signal handler is not removed when this struct goes out of scope.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TerminateRequested {
|
||||||
|
value: Arc<AtomicBool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TerminateRequested {
|
||||||
|
/// Register signal handlers watching for common termination signals
|
||||||
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
|
let value = Arc::new(AtomicBool::new(false));
|
||||||
|
for sig in signal_hook::consts::TERM_SIGNALS.iter().copied() {
|
||||||
|
sig_register(sig, Arc::clone(&value))?;
|
||||||
|
}
|
||||||
|
Ok(Self { value })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a termination signal has been set
|
||||||
|
pub fn value(&self) -> bool {
|
||||||
|
self.value.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ use clap::Parser;
|
|||||||
use clap_mangen::roff::{roman, Roff};
|
use clap_mangen::roff::{roman, Roff};
|
||||||
use log::error;
|
use log::error;
|
||||||
use rosenpass::cli::CliArgs;
|
use rosenpass::cli::CliArgs;
|
||||||
use rosenpass_util::functional::run;
|
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
|
|
||||||
/// Printing custom man sections when generating the man page
|
/// Printing custom man sections when generating the man page
|
||||||
@@ -78,40 +77,13 @@ pub fn main() {
|
|||||||
// error!("error dummy");
|
// error!("error dummy");
|
||||||
}
|
}
|
||||||
|
|
||||||
let res = run(|| {
|
let broker_interface = args.get_broker_interface();
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
match args.run(broker_interface, None) {
|
||||||
let term_signal = terminate::TerminateRequested::new()?;
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
let broker_interface = args.get_broker_interface();
|
error!("{e:?}");
|
||||||
let err = match args.run(broker_interface, None) {
|
exit(1);
|
||||||
Ok(()) => return Ok(()),
|
|
||||||
Err(err) => err,
|
|
||||||
};
|
|
||||||
|
|
||||||
// This is very very hacky and just used for coverage measurement
|
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
{
|
|
||||||
let terminated_by_signal = err
|
|
||||||
.downcast_ref::<std::io::Error>()
|
|
||||||
.filter(|e| e.kind() == std::io::ErrorKind::Interrupted)
|
|
||||||
.filter(|_| term_signal.value())
|
|
||||||
.is_some();
|
|
||||||
if terminated_by_signal {
|
|
||||||
log::warn!(
|
|
||||||
"\
|
|
||||||
Terminated by signal; this signal handler is correct during coverage testing \
|
|
||||||
but should be otherwise disabled"
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(err)
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
error!("{e:?}");
|
|
||||||
exit(1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,47 +110,3 @@ Peischl, Stephan Ajuvo, and Lisa Schmidt.";
|
|||||||
/// Custom main page section: Bugs.
|
/// Custom main page section: Bugs.
|
||||||
static BUGS_MAN: &str = r"
|
static BUGS_MAN: &str = r"
|
||||||
The bugs are tracked at https://github.com/rosenpass/rosenpass/issues.";
|
The bugs are tracked at https://github.com/rosenpass/rosenpass/issues.";
|
||||||
|
|
||||||
/// These signal handlers are used exclusively used during coverage testing
|
|
||||||
/// to ensure that the llvm-cov can produce reports during integration tests
|
|
||||||
/// with multiple processes where subprocesses are terminated via kill(2).
|
|
||||||
///
|
|
||||||
/// llvm-cov does not support producing coverage reports when the process exits
|
|
||||||
/// through a signal, so this is necessary.
|
|
||||||
///
|
|
||||||
/// The functionality of exiting gracefully upon reception of a terminating signal
|
|
||||||
/// is desired for the production variant of Rosenpass, but we should make sure
|
|
||||||
/// to use a higher quality implementation; in particular, we should use signalfd(2).
|
|
||||||
///
|
|
||||||
#[cfg(feature = "internal_signal_handling_for_coverage_reports")]
|
|
||||||
mod terminate {
|
|
||||||
use signal_hook::flag::register as sig_register;
|
|
||||||
use std::sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Automatically register a signal handler for common termination signals;
|
|
||||||
/// whether one of these signals was issued can be polled using [Self::value].
|
|
||||||
///
|
|
||||||
/// The signal handler is not removed when this struct goes out of scope.
|
|
||||||
pub struct TerminateRequested {
|
|
||||||
value: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TerminateRequested {
|
|
||||||
/// Register signal handlers watching for common termination signals
|
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
|
||||||
let value = Arc::new(AtomicBool::new(false));
|
|
||||||
for sig in signal_hook::consts::TERM_SIGNALS.iter().copied() {
|
|
||||||
sig_register(sig, Arc::clone(&value))?;
|
|
||||||
}
|
|
||||||
Ok(Self { value })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check whether a termination signal has been set
|
|
||||||
pub fn value(&self) -> bool {
|
|
||||||
self.value.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -35,8 +35,15 @@ impl Drop for KillChild {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
use rustix::process::{kill_process, Pid, Signal::Term};
|
use rustix::process::{kill_process, Pid, Signal::Term};
|
||||||
let pid = Pid::from_child(&self.0);
|
let pid = Pid::from_child(&self.0);
|
||||||
rustix::process::kill_process(pid, Term).discard_result();
|
// We seriously need to start handling signals with signalfd, our current signal handling
|
||||||
self.0.wait().discard_result();
|
// system is a bit broken; there is probably a few functions that just restart on EINTR
|
||||||
|
// so the signal is absorbed
|
||||||
|
loop {
|
||||||
|
rustix::process::kill_process(pid, Term).discard_result();
|
||||||
|
if self.0.try_wait().unwrap().is_some() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user