Try threads instead of process

This commit is contained in:
Prabhpreet Dua
2024-04-16 09:22:08 +05:30
parent 92824bb5b0
commit e7de4848fb
7 changed files with 132 additions and 23 deletions

73
Cargo.lock generated
View File

@@ -541,6 +541,41 @@ dependencies = [
"typenum",
]
[[package]]
name = "darling"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "dashmap"
version = "5.5.3"
@@ -565,6 +600,37 @@ dependencies = [
"syn",
]
[[package]]
name = "derive_builder"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "derive_builder_macro"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b"
dependencies = [
"derive_builder_core",
"syn",
]
[[package]]
name = "digest"
version = "0.10.7"
@@ -854,6 +920,12 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "indexmap"
version = "1.9.3"
@@ -1464,6 +1536,7 @@ dependencies = [
"anyhow",
"clap 4.4.10",
"criterion",
"derive_builder",
"env_logger",
"home",
"log",

View File

@@ -61,3 +61,4 @@ zerocopy = { version = "0.7.32", features = ["derive"] }
home = "0.5.9"
procspawn = "1.0.0"
serial_test = "3.0.0"
derive_builder = "0.20.0"

View File

@@ -38,6 +38,7 @@ mio = { workspace = true }
rand = { workspace = true }
zerocopy = { workspace = true }
home = { workspace = true }
derive_builder = {workspace = true}
[build-dependencies]
anyhow = { workspace = true }

View File

@@ -1,6 +1,7 @@
use anyhow::bail;
use anyhow::Result;
use derive_builder::Builder;
use log::{debug, error, info, warn};
use mio::Interest;
use mio::Token;
@@ -76,11 +77,14 @@ pub enum DoSOperation {
UnderLoad,
Normal,
}
/// Integration test flags for AppServer
#[derive(Debug, Default)]
pub struct AppServerTestFlags {
/// Integration test helpers for AppServer
#[derive(Debug, Builder)]
#[builder(pattern = "owned")]
pub struct AppServerTest {
/// Enable DoS operation permanently
pub enable_dos_permanently: bool,
/// Terminate application signal
pub terminate: Option<std::sync::mpsc::Receiver<()>>,
}
/// Holds the state of the application, namely the external IO
@@ -101,7 +105,7 @@ pub struct AppServer {
pub non_blocking_polls_count: usize,
pub unpolled_count: usize,
pub last_update_time: Instant,
pub test_flags: AppServerTestFlags,
pub test_helpers: Option<AppServerTest>,
}
/// A socket pointer is an index assigned to a socket;
@@ -408,7 +412,7 @@ impl AppServer {
pk: SPk,
addrs: Vec<SocketAddr>,
verbosity: Verbosity,
test_flags: AppServerTestFlags,
test_helpers: Option<AppServerTest>,
) -> anyhow::Result<Self> {
// setup mio
let mio_poll = mio::Poll::new()?;
@@ -507,7 +511,7 @@ impl AppServer {
non_blocking_polls_count: 0,
unpolled_count: 0,
last_update_time: Instant::now(),
test_flags,
test_helpers,
})
}
@@ -598,6 +602,17 @@ impl AppServer {
use crate::protocol::HandleMsgResult;
use AppPollResult::*;
use KeyOutputReason::*;
if let Some(AppServerTest {
terminate: Some(terminate),
..
}) = &self.test_helpers
{
if let Ok(_) = terminate.try_recv() {
return Ok(());
}
}
match self.poll(&mut *rx)? {
#[allow(clippy::redundant_closure_call)]
SendInitiation(peer) => tx_maybe_with!(peer, || self
@@ -819,7 +834,11 @@ impl AppServer {
self.unpolled_count += 1;
}
if self.test_flags.enable_dos_permanently {
if let Some(AppServerTest {
enable_dos_permanently: true,
..
}) = self.test_helpers
{
self.under_load = DoSOperation::UnderLoad;
} else {
//Reset blocking poll count if waiting for more than BLOCKING_POLL_COUNT_DURATION

View File

@@ -7,7 +7,7 @@ use rosenpass_util::file::{LoadValue, LoadValueB64};
use std::path::PathBuf;
use crate::app_server::AppServer;
use crate::app_server::{self, AppServerTestFlags};
use crate::app_server::{self, AppServerTest};
use crate::protocol::{SPk, SSk, SymKey};
use super::config;
@@ -150,7 +150,7 @@ impl CliCommand {
///
/// ## TODO
/// - This method consumes the [`CliCommand`] value. It might be wise to use a reference...
pub fn run(self, test_flags: AppServerTestFlags) -> anyhow::Result<()> {
pub fn run(self, test_helpers: Option<AppServerTest>) -> anyhow::Result<()> {
use CliCommand::*;
match self {
Man => {
@@ -257,7 +257,7 @@ impl CliCommand {
let config = config::Rosenpass::load(config_file)?;
config.validate()?;
Self::event_loop(config, test_flags)?;
Self::event_loop(config, test_helpers)?;
}
Exchange {
@@ -274,7 +274,7 @@ impl CliCommand {
config.config_file_path = p;
}
config.validate()?;
Self::event_loop(config, test_flags)?;
Self::event_loop(config, test_helpers)?;
}
Validate { config_files } => {
@@ -296,7 +296,10 @@ impl CliCommand {
Ok(())
}
fn event_loop(config: config::Rosenpass, test_flags: AppServerTestFlags) -> anyhow::Result<()> {
fn event_loop(
config: config::Rosenpass,
test_helpers: Option<AppServerTest>,
) -> anyhow::Result<()> {
// load own keys
let sk = SSk::load(&config.secret_key)?;
let pk = SPk::load(&config.public_key)?;
@@ -307,7 +310,7 @@ impl CliCommand {
pk,
config.listen,
config.verbosity,
test_flags,
test_helpers,
)?);
for cfg_peer in config.peers {

View File

@@ -1,6 +1,6 @@
use clap::Parser;
use log::error;
use rosenpass::{app_server::AppServerTestFlags, cli::CliArgs};
use rosenpass::cli::CliArgs;
use std::process::exit;
/// Catches errors, prints them through the logger, then exits
@@ -26,7 +26,7 @@ pub fn main() {
// error!("error dummy");
}
match args.command.run(AppServerTestFlags::default()) {
match args.command.run(None) {
Ok(_) => {}
Err(e) => {
error!("{e}");

View File

@@ -1,7 +1,13 @@
use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, time::Duration};
use std::{
fs, net::UdpSocket, os::unix::thread::JoinHandleExt, path::PathBuf, process::Stdio,
time::Duration,
};
use clap::Parser;
use rosenpass::{app_server::AppServerTestFlags, cli::CliArgs};
use rosenpass::{
app_server::{AppServerTest, AppServerTestBuilder},
cli::CliArgs,
};
use serial_test::serial;
const BIN: &str = "rosenpass";
@@ -190,12 +196,18 @@ fn check_exchange_under_dos() {
acc
});
let mut server = procspawn::spawn(server_cmd, |server_cmd: Vec<String>| {
let (server_terminate, server_terminate_rx) = std::sync::mpsc::channel();
let mut server = std::thread::spawn(move || {
let cli = CliArgs::try_parse_from(server_cmd.iter()).unwrap();
cli.command
.run(AppServerTestFlags {
enable_dos_permanently: true,
})
.run(Some(
AppServerTestBuilder::default()
.enable_dos_permanently(true)
.terminate(Some(server_terminate_rx))
.build()
.unwrap(),
))
.unwrap();
});
@@ -219,7 +231,7 @@ fn check_exchange_under_dos() {
std::thread::sleep(Duration::from_secs(10));
// time's up, kill the childs
server.kill().unwrap();
server_terminate.send(()).unwrap();
client.kill().unwrap();
// read the shared keys they created