diff --git a/Cargo.lock b/Cargo.lock index f53b0c2..b40dee6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1123,6 +1123,7 @@ dependencies = [ "clap 4.4.10", "criterion", "env_logger", + "libc", "log", "memoffset", "mio", @@ -1292,9 +1293,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "semver" -version = "1.0.21" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" diff --git a/Cargo.toml b/Cargo.toml index 35d7386..c6657ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,3 +59,4 @@ mio = { version = "0.8.9", features = ["net", "os-poll"] } oqs-sys = { version = "0.8", default-features = false, features = ['classic_mceliece', 'kyber'] } blake2 = "0.10.6" chacha20poly1305 = { version = "0.10.1", default-features = false, features = [ "std", "heapless" ] } +libc = "0.2" \ No newline at end of file diff --git a/rosenpass/Cargo.toml b/rosenpass/Cargo.toml index 4523c1f..6741e54 100644 --- a/rosenpass/Cargo.toml +++ b/rosenpass/Cargo.toml @@ -33,6 +33,7 @@ toml = { workspace = true } clap = { workspace = true } mio = { workspace = true } rand = { workspace = true } +libc={workspace = true,optional=true} [build-dependencies] anyhow = { workspace = true } @@ -40,4 +41,7 @@ anyhow = { workspace = true } [dev-dependencies] criterion = { workspace = true } test_bin = { workspace = true } -stacker = { workspace = true } \ No newline at end of file +stacker = { workspace = true } + +[features] +integration_test = ["dep:libc"] diff --git a/rosenpass/src/app_server.rs b/rosenpass/src/app_server.rs index d0876e7..7548a30 100644 --- a/rosenpass/src/app_server.rs +++ b/rosenpass/src/app_server.rs @@ -34,11 +34,14 @@ use rosenpass_util::b64::{b64_writer, fmt_b64}; const IPV4_ANY_ADDR: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); const IPV6_ANY_ADDR: Ipv6Addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0); -const NORMAL_OPERATION_THRESHOLD: usize = 5; -const UNDER_LOAD_THRESHOLD: usize = 10; -const RESET_DURATION: Duration = Duration::from_secs(1); - -const LAST_UNDER_LOAD_WINDOW: Duration = Duration::from_secs(1); +#[cfg(feature = "integration_test")] +const UNDER_LOAD_RATIO: f64 = 0.001; +#[cfg(feature = "integration_test")] +const DURATION_UPDATE_UNDER_LOAD_STATUS: Duration = Duration::from_millis(10); +#[cfg(not(feature = "integration_test"))] +const UNDER_LOAD_RATIO: f64 = 0.5; +#[cfg(not(feature = "integration_test"))] +const DURATION_UPDATE_UNDER_LOAD_STATUS: Duration = Duration::from_millis(100); fn ipv4_any_binding() -> SocketAddr { // addr, port @@ -74,10 +77,10 @@ pub struct WireguardOut { pub extra_params: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum DoSOperation { - UnderLoad { last_under_load: Instant }, - Normal{blocked_polls: usize}, + UnderLoad, + Normal, } /// Holds the state of the application, namely the external IO @@ -94,6 +97,10 @@ pub struct AppServer { pub verbosity: Verbosity, pub all_sockets_drained: bool, pub under_load: DoSOperation, + pub blocking_polls_count: usize, + pub non_blocking_polls_count: usize, + pub unpolled_count: usize, + pub last_update_time: Instant, } /// A socket pointer is an index assigned to a socket; @@ -493,7 +500,11 @@ impl AppServer { events, mio_poll, all_sockets_drained: false, - under_load: DoSOperation::Normal{ blocked_polls: 0}, + under_load: DoSOperation::Normal, + blocking_polls_count: 0, + non_blocking_polls_count: 0, + unpolled_count: 0, + last_update_time: Instant::now(), }) } @@ -609,14 +620,10 @@ impl AppServer { ReceivedMessage(len, endpoint) => { let msg_result = match self.under_load { - DoSOperation::UnderLoad { last_under_load: _ } => { - println!("Processing msg under load"); + DoSOperation::UnderLoad => { self.handle_msg_under_load(&endpoint, &rx[..len], &mut *tx) } - DoSOperation::Normal { blocked_polls: _} => { - println!("Processing msg normally"); - self.crypt.handle_msg(&rx[..len], &mut *tx) - } + DoSOperation::Normal => self.crypt.handle_msg(&rx[..len], &mut *tx), }; match msg_result { Err(ref e) => { @@ -794,36 +801,63 @@ impl AppServer { // only poll if we drained all sockets before if self.all_sockets_drained { //Non blocked polling - self.mio_poll.poll(&mut self.events, Some(Duration::from_secs(0)))?; + self.mio_poll + .poll(&mut self.events, Some(Duration::from_secs(0)))?; if self.events.iter().peekable().peek().is_none() { - // if there are no events, then we can just return - match self.under_load { - DoSOperation::Normal { blocked_polls } => { - self.under_load = DoSOperation::Normal { - blocked_polls: blocked_polls + 1, - } - } - _ => {} - } - + // if there are no events, then add to blocking poll count + self.blocking_polls_count += 1; + //Execute blocking poll self.mio_poll.poll(&mut self.events, Some(timeout))?; + } else { + self.non_blocking_polls_count += 1; } + } else { + self.unpolled_count += 1; } - match self.under_load { - DoSOperation::Normal { blocked_polls } => { - if blocked_polls > NORMAL_OPERATION_THRESHOLD { - self.under_load = DoSOperation::UnderLoad { - last_under_load: Instant::now(), + //Reset blocking poll count if waiting for more than BLOCKING_POLL_COUNT_DURATION + if self.last_update_time.elapsed() > DURATION_UPDATE_UNDER_LOAD_STATUS { + self.last_update_time = Instant::now(); + let total_polls = self.blocking_polls_count + self.non_blocking_polls_count; + + let load_ratio = if total_polls > 0 { + self.non_blocking_polls_count as f64 / total_polls as f64 + } else if self.unpolled_count > 0 { + //There are no polls, so we are under load + 1.0 + } else { + 0.0 + }; + + let prev_under_load = self.under_load; + if load_ratio > UNDER_LOAD_RATIO { + self.under_load = DoSOperation::UnderLoad; + //Test feature- if under load goes to normal operation, write to file + #[cfg(feature = "integration_test")] + { + if (prev_under_load == DoSOperation::Normal) { + let sem_name = b"/rp_integration_test_under_dos\0"; + + // Create or open a semaphore + let sem = unsafe { + libc::sem_open(sem_name.as_ptr() as *const i8, libc::O_CREAT, 0o644, 0) + }; + if sem == libc::SEM_FAILED { + panic!("Failed to create or open semaphore"); + } + + // Post semaphore + unsafe { libc::sem_post(sem) }; } } + } else { + self.under_load = DoSOperation::Normal; } - DoSOperation::UnderLoad { last_under_load } => { - if last_under_load.elapsed() > RESET_DURATION { - self.under_load = DoSOperation::Normal { blocked_polls: 0 }; - } - } + + self.blocking_polls_count = 0; + self.non_blocking_polls_count = 0; + self.unpolled_count = 0; } // drain all sockets diff --git a/rosenpass/src/protocol.rs b/rosenpass/src/protocol.rs index 8aa7eac..9932b32 100644 --- a/rosenpass/src/protocol.rs +++ b/rosenpass/src/protocol.rs @@ -886,7 +886,8 @@ impl CryptoServer { let mut rx_cookie = [0u8; COOKIE_SIZE]; let mut rx_mac = [0u8; MAC_SIZE]; let mut rx_sid = [0u8; 4]; - match rx_buf[0].try_into() { + let msg_type = rx_buf[0].try_into(); + match msg_type { Ok(MsgType::InitHello) => { let msg_in = rx_buf.envelope::>()?; expected.copy_from_slice( diff --git a/rosenpass/tests/integration_test.rs b/rosenpass/tests/integration_test.rs index 703594b..553cda3 100644 --- a/rosenpass/tests/integration_test.rs +++ b/rosenpass/tests/integration_test.rs @@ -1,4 +1,4 @@ -use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, time::Duration}; +use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, thread::sleep, time::Duration}; const BIN: &str = "rosenpass"; @@ -119,6 +119,7 @@ fn check_exchange_under_normal() { } // check that we can exchange keys +#[cfg(feature = "integration_test")] #[test] fn check_exchange_under_dos() { let tmpdir = PathBuf::from(env!("CARGO_TARGET_TMPDIR")).join("exchange-dos"); @@ -143,6 +144,14 @@ fn check_exchange_under_dos() { assert!(pub_key_path.is_file()); } + //Create a semaphore. The server will unblock this semaphore after it is under load condition. + //There are parameters setup under app_server to remain in load condition once triggered for this test feature. + let sem_name = b"/rp_integration_test_under_dos\0"; + let sem = unsafe { libc::sem_open(sem_name.as_ptr() as *const i8, libc::O_CREAT, 0o644, 1) }; + unsafe { + libc::sem_wait(sem); + } + // start first process, the server let port = find_udp_socket(); let listen_addr = format!("localhost:{port}"); @@ -158,29 +167,56 @@ fn check_exchange_under_dos() { //.stdout(Stdio::null()) //.stderr(Stdio::null()) .spawn() - .expect("Failed to start {BIN}"); + .expect("Test setup failed- Failed to start {server_bin}"); - std::thread::sleep(Duration::from_millis(500)); - - //DoS Sender - //Create a UDP socket + //Create a UDP socket for DOS sender let socket = UdpSocket::bind("127.0.0.1:0").expect("couldn't bind to address"); - //Spawn a thread to send DoS packets let server_addr = listen_addr.clone(); //Create thread safe atomic bool to stop the DoS attack let stop_dos = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let stop_dos_handle = stop_dos.clone(); - + + //Spawn a thread to send DoS packets let dos_attack = std::thread::spawn(move || { while stop_dos.load(std::sync::atomic::Ordering::Relaxed) == false { let buf = [0; 10]; socket .send_to(&buf, &server_addr) .expect("couldn't send data"); + + sleep(Duration::from_micros(10)); } }); + //Wait till we are under load condition for upto 5 seconds + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + let now = std::time::SystemTime::now(); + let timeout_absolute = now + Duration::from_secs(5); + if let Ok(duration) = timeout_absolute.duration_since(std::time::SystemTime::UNIX_EPOCH) { + ts.tv_sec = duration.as_secs() as libc::time_t; + ts.tv_nsec = duration.subsec_nanos() as _; + } else { + panic!("Test setup failed- Failed to calculate timeout for semaphore"); + } + let mut failed_wait = false; + if (unsafe { libc::sem_timedwait(sem, &ts) } == -1) { + failed_wait = true; + } + // Close and unlink the semaphore + if unsafe { libc::sem_close(sem) } == -1 { + panic!("Test setup failed- Failed to close semaphore"); + } + if unsafe { libc::sem_unlink(sem_name.as_ptr() as *const i8) } == -1 { + panic!("Test setup failed- Failed to unlink semaphore"); + } + if failed_wait { + panic!("Failed to wait for semaphore- load condition not reached"); + } + // start second process, the client let mut client = test_bin::get_test_bin(BIN) .args(["exchange", "secret-key"]) @@ -197,8 +233,8 @@ fn check_exchange_under_dos() { .spawn() .expect("Failed to start {BIN}"); - // give them some time to do the key exchange - std::thread::sleep(Duration::from_secs(2)); + // give them some time to do the key exchange under load + std::thread::sleep(Duration::from_secs(10)); // time's up, kill the childs server.kill().unwrap();