From e7de4848fbc425b6a10846911846afb9440efc46 Mon Sep 17 00:00:00 2001 From: Prabhpreet Dua <615318+prabhpreet@users.noreply.github.com> Date: Tue, 16 Apr 2024 09:22:08 +0530 Subject: [PATCH] Try threads instead of process --- Cargo.lock | 73 +++++++++++++++++++++++++++++ Cargo.toml | 1 + rosenpass/Cargo.toml | 3 +- rosenpass/src/app_server.rs | 33 ++++++++++--- rosenpass/src/cli.rs | 15 +++--- rosenpass/src/main.rs | 4 +- rosenpass/tests/integration_test.rs | 26 +++++++--- 7 files changed, 132 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b85feda..4684261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -541,6 +541,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -565,6 +600,37 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -854,6 +920,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" version = "1.9.3" @@ -1464,6 +1536,7 @@ dependencies = [ "anyhow", "clap 4.4.10", "criterion", + "derive_builder", "env_logger", "home", "log", diff --git a/Cargo.toml b/Cargo.toml index 347a327..5872f6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,3 +61,4 @@ zerocopy = { version = "0.7.32", features = ["derive"] } home = "0.5.9" procspawn = "1.0.0" serial_test = "3.0.0" +derive_builder = "0.20.0" diff --git a/rosenpass/Cargo.toml b/rosenpass/Cargo.toml index 1417f0a..ff12230 100644 --- a/rosenpass/Cargo.toml +++ b/rosenpass/Cargo.toml @@ -38,6 +38,7 @@ mio = { workspace = true } rand = { workspace = true } zerocopy = { workspace = true } home = { workspace = true } +derive_builder = {workspace = true} [build-dependencies] anyhow = { workspace = true } @@ -47,4 +48,4 @@ criterion = { workspace = true } test_bin = { workspace = true } stacker = { workspace = true } procspawn = {workspace = true} -serial_test = {workspace = true} \ No newline at end of file +serial_test = {workspace = true} diff --git a/rosenpass/src/app_server.rs b/rosenpass/src/app_server.rs index 1663a6e..d33dc5a 100644 --- a/rosenpass/src/app_server.rs +++ b/rosenpass/src/app_server.rs @@ -1,6 +1,7 @@ use anyhow::bail; use anyhow::Result; +use derive_builder::Builder; use log::{debug, error, info, warn}; use mio::Interest; use mio::Token; @@ -76,11 +77,14 @@ pub enum DoSOperation { UnderLoad, Normal, } -/// Integration test flags for AppServer -#[derive(Debug, Default)] -pub struct AppServerTestFlags { +/// Integration test helpers for AppServer +#[derive(Debug, Builder)] +#[builder(pattern = "owned")] +pub struct AppServerTest { /// Enable DoS operation permanently pub enable_dos_permanently: bool, + /// Terminate application signal + pub terminate: Option>, } /// Holds the state of the application, namely the external IO @@ -101,7 +105,7 @@ pub struct AppServer { pub non_blocking_polls_count: usize, pub unpolled_count: usize, pub last_update_time: Instant, - pub test_flags: AppServerTestFlags, + pub test_helpers: Option, } /// A socket pointer is an index assigned to a socket; @@ -408,7 +412,7 @@ impl AppServer { pk: SPk, addrs: Vec, verbosity: Verbosity, - test_flags: AppServerTestFlags, + test_helpers: Option, ) -> anyhow::Result { // setup mio let mio_poll = mio::Poll::new()?; @@ -507,7 +511,7 @@ impl AppServer { non_blocking_polls_count: 0, unpolled_count: 0, last_update_time: Instant::now(), - test_flags, + test_helpers, }) } @@ -598,6 +602,17 @@ impl AppServer { use crate::protocol::HandleMsgResult; use AppPollResult::*; use KeyOutputReason::*; + + if let Some(AppServerTest { + terminate: Some(terminate), + .. + }) = &self.test_helpers + { + if let Ok(_) = terminate.try_recv() { + return Ok(()); + } + } + match self.poll(&mut *rx)? { #[allow(clippy::redundant_closure_call)] SendInitiation(peer) => tx_maybe_with!(peer, || self @@ -819,7 +834,11 @@ impl AppServer { self.unpolled_count += 1; } - if self.test_flags.enable_dos_permanently { + if let Some(AppServerTest { + enable_dos_permanently: true, + .. + }) = self.test_helpers + { self.under_load = DoSOperation::UnderLoad; } else { //Reset blocking poll count if waiting for more than BLOCKING_POLL_COUNT_DURATION diff --git a/rosenpass/src/cli.rs b/rosenpass/src/cli.rs index 020e480..698247c 100644 --- a/rosenpass/src/cli.rs +++ b/rosenpass/src/cli.rs @@ -7,7 +7,7 @@ use rosenpass_util::file::{LoadValue, LoadValueB64}; use std::path::PathBuf; use crate::app_server::AppServer; -use crate::app_server::{self, AppServerTestFlags}; +use crate::app_server::{self, AppServerTest}; use crate::protocol::{SPk, SSk, SymKey}; use super::config; @@ -150,7 +150,7 @@ impl CliCommand { /// /// ## TODO /// - This method consumes the [`CliCommand`] value. It might be wise to use a reference... - pub fn run(self, test_flags: AppServerTestFlags) -> anyhow::Result<()> { + pub fn run(self, test_helpers: Option) -> anyhow::Result<()> { use CliCommand::*; match self { Man => { @@ -257,7 +257,7 @@ impl CliCommand { let config = config::Rosenpass::load(config_file)?; config.validate()?; - Self::event_loop(config, test_flags)?; + Self::event_loop(config, test_helpers)?; } Exchange { @@ -274,7 +274,7 @@ impl CliCommand { config.config_file_path = p; } config.validate()?; - Self::event_loop(config, test_flags)?; + Self::event_loop(config, test_helpers)?; } Validate { config_files } => { @@ -296,7 +296,10 @@ impl CliCommand { Ok(()) } - fn event_loop(config: config::Rosenpass, test_flags: AppServerTestFlags) -> anyhow::Result<()> { + fn event_loop( + config: config::Rosenpass, + test_helpers: Option, + ) -> anyhow::Result<()> { // load own keys let sk = SSk::load(&config.secret_key)?; let pk = SPk::load(&config.public_key)?; @@ -307,7 +310,7 @@ impl CliCommand { pk, config.listen, config.verbosity, - test_flags, + test_helpers, )?); for cfg_peer in config.peers { diff --git a/rosenpass/src/main.rs b/rosenpass/src/main.rs index 4375d93..acf2850 100644 --- a/rosenpass/src/main.rs +++ b/rosenpass/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use log::error; -use rosenpass::{app_server::AppServerTestFlags, cli::CliArgs}; +use rosenpass::cli::CliArgs; use std::process::exit; /// Catches errors, prints them through the logger, then exits @@ -26,7 +26,7 @@ pub fn main() { // error!("error dummy"); } - match args.command.run(AppServerTestFlags::default()) { + match args.command.run(None) { Ok(_) => {} Err(e) => { error!("{e}"); diff --git a/rosenpass/tests/integration_test.rs b/rosenpass/tests/integration_test.rs index 31554c7..0a895a9 100644 --- a/rosenpass/tests/integration_test.rs +++ b/rosenpass/tests/integration_test.rs @@ -1,7 +1,13 @@ -use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, time::Duration}; +use std::{ + fs, net::UdpSocket, os::unix::thread::JoinHandleExt, path::PathBuf, process::Stdio, + time::Duration, +}; use clap::Parser; -use rosenpass::{app_server::AppServerTestFlags, cli::CliArgs}; +use rosenpass::{ + app_server::{AppServerTest, AppServerTestBuilder}, + cli::CliArgs, +}; use serial_test::serial; const BIN: &str = "rosenpass"; @@ -190,12 +196,18 @@ fn check_exchange_under_dos() { acc }); - let mut server = procspawn::spawn(server_cmd, |server_cmd: Vec| { + let (server_terminate, server_terminate_rx) = std::sync::mpsc::channel(); + + let mut server = std::thread::spawn(move || { let cli = CliArgs::try_parse_from(server_cmd.iter()).unwrap(); cli.command - .run(AppServerTestFlags { - enable_dos_permanently: true, - }) + .run(Some( + AppServerTestBuilder::default() + .enable_dos_permanently(true) + .terminate(Some(server_terminate_rx)) + .build() + .unwrap(), + )) .unwrap(); }); @@ -219,7 +231,7 @@ fn check_exchange_under_dos() { std::thread::sleep(Duration::from_secs(10)); // time's up, kill the childs - server.kill().unwrap(); + server_terminate.send(()).unwrap(); client.kill().unwrap(); // read the shared keys they created