Poll based under load with intg test

This commit is contained in:
Prabhpreet Dua
2024-02-04 20:17:28 +05:30
parent d18107b3a9
commit 0b4699e24a
6 changed files with 127 additions and 50 deletions

5
Cargo.lock generated
View File

@@ -1123,6 +1123,7 @@ dependencies = [
"clap 4.4.10", "clap 4.4.10",
"criterion", "criterion",
"env_logger", "env_logger",
"libc",
"log", "log",
"memoffset", "memoffset",
"mio", "mio",
@@ -1292,9 +1293,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.21" version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090"
[[package]] [[package]]
name = "serde" name = "serde"

View File

@@ -59,3 +59,4 @@ mio = { version = "0.8.9", features = ["net", "os-poll"] }
oqs-sys = { version = "0.8", default-features = false, features = ['classic_mceliece', 'kyber'] } oqs-sys = { version = "0.8", default-features = false, features = ['classic_mceliece', 'kyber'] }
blake2 = "0.10.6" blake2 = "0.10.6"
chacha20poly1305 = { version = "0.10.1", default-features = false, features = [ "std", "heapless" ] } chacha20poly1305 = { version = "0.10.1", default-features = false, features = [ "std", "heapless" ] }
libc = "0.2"

View File

@@ -33,6 +33,7 @@ toml = { workspace = true }
clap = { workspace = true } clap = { workspace = true }
mio = { workspace = true } mio = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
libc={workspace = true,optional=true}
[build-dependencies] [build-dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }
@@ -40,4 +41,7 @@ anyhow = { workspace = true }
[dev-dependencies] [dev-dependencies]
criterion = { workspace = true } criterion = { workspace = true }
test_bin = { workspace = true } test_bin = { workspace = true }
stacker = { workspace = true } stacker = { workspace = true }
[features]
integration_test = ["dep:libc"]

View File

@@ -34,11 +34,14 @@ use rosenpass_util::b64::{b64_writer, fmt_b64};
const IPV4_ANY_ADDR: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); const IPV4_ANY_ADDR: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0);
const IPV6_ANY_ADDR: Ipv6Addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0); const IPV6_ANY_ADDR: Ipv6Addr = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
const NORMAL_OPERATION_THRESHOLD: usize = 5; #[cfg(feature = "integration_test")]
const UNDER_LOAD_THRESHOLD: usize = 10; const UNDER_LOAD_RATIO: f64 = 0.001;
const RESET_DURATION: Duration = Duration::from_secs(1); #[cfg(feature = "integration_test")]
const DURATION_UPDATE_UNDER_LOAD_STATUS: Duration = Duration::from_millis(10);
const LAST_UNDER_LOAD_WINDOW: Duration = Duration::from_secs(1); #[cfg(not(feature = "integration_test"))]
const UNDER_LOAD_RATIO: f64 = 0.5;
#[cfg(not(feature = "integration_test"))]
const DURATION_UPDATE_UNDER_LOAD_STATUS: Duration = Duration::from_millis(100);
fn ipv4_any_binding() -> SocketAddr { fn ipv4_any_binding() -> SocketAddr {
// addr, port // addr, port
@@ -74,10 +77,10 @@ pub struct WireguardOut {
pub extra_params: Vec<String>, pub extra_params: Vec<String>,
} }
#[derive(Debug)] #[derive(Debug, Clone, Copy, PartialEq)]
pub enum DoSOperation { pub enum DoSOperation {
UnderLoad { last_under_load: Instant }, UnderLoad,
Normal{blocked_polls: usize}, Normal,
} }
/// Holds the state of the application, namely the external IO /// Holds the state of the application, namely the external IO
@@ -94,6 +97,10 @@ pub struct AppServer {
pub verbosity: Verbosity, pub verbosity: Verbosity,
pub all_sockets_drained: bool, pub all_sockets_drained: bool,
pub under_load: DoSOperation, pub under_load: DoSOperation,
pub blocking_polls_count: usize,
pub non_blocking_polls_count: usize,
pub unpolled_count: usize,
pub last_update_time: Instant,
} }
/// A socket pointer is an index assigned to a socket; /// A socket pointer is an index assigned to a socket;
@@ -493,7 +500,11 @@ impl AppServer {
events, events,
mio_poll, mio_poll,
all_sockets_drained: false, all_sockets_drained: false,
under_load: DoSOperation::Normal{ blocked_polls: 0}, under_load: DoSOperation::Normal,
blocking_polls_count: 0,
non_blocking_polls_count: 0,
unpolled_count: 0,
last_update_time: Instant::now(),
}) })
} }
@@ -609,14 +620,10 @@ impl AppServer {
ReceivedMessage(len, endpoint) => { ReceivedMessage(len, endpoint) => {
let msg_result = match self.under_load { let msg_result = match self.under_load {
DoSOperation::UnderLoad { last_under_load: _ } => { DoSOperation::UnderLoad => {
println!("Processing msg under load");
self.handle_msg_under_load(&endpoint, &rx[..len], &mut *tx) self.handle_msg_under_load(&endpoint, &rx[..len], &mut *tx)
} }
DoSOperation::Normal { blocked_polls: _} => { DoSOperation::Normal => self.crypt.handle_msg(&rx[..len], &mut *tx),
println!("Processing msg normally");
self.crypt.handle_msg(&rx[..len], &mut *tx)
}
}; };
match msg_result { match msg_result {
Err(ref e) => { Err(ref e) => {
@@ -794,36 +801,63 @@ impl AppServer {
// only poll if we drained all sockets before // only poll if we drained all sockets before
if self.all_sockets_drained { if self.all_sockets_drained {
//Non blocked polling //Non blocked polling
self.mio_poll.poll(&mut self.events, Some(Duration::from_secs(0)))?; self.mio_poll
.poll(&mut self.events, Some(Duration::from_secs(0)))?;
if self.events.iter().peekable().peek().is_none() { if self.events.iter().peekable().peek().is_none() {
// if there are no events, then we can just return // if there are no events, then add to blocking poll count
match self.under_load { self.blocking_polls_count += 1;
DoSOperation::Normal { blocked_polls } => { //Execute blocking poll
self.under_load = DoSOperation::Normal {
blocked_polls: blocked_polls + 1,
}
}
_ => {}
}
self.mio_poll.poll(&mut self.events, Some(timeout))?; self.mio_poll.poll(&mut self.events, Some(timeout))?;
} else {
self.non_blocking_polls_count += 1;
} }
} else {
self.unpolled_count += 1;
} }
match self.under_load { //Reset blocking poll count if waiting for more than BLOCKING_POLL_COUNT_DURATION
DoSOperation::Normal { blocked_polls } => { if self.last_update_time.elapsed() > DURATION_UPDATE_UNDER_LOAD_STATUS {
if blocked_polls > NORMAL_OPERATION_THRESHOLD { self.last_update_time = Instant::now();
self.under_load = DoSOperation::UnderLoad { let total_polls = self.blocking_polls_count + self.non_blocking_polls_count;
last_under_load: Instant::now(),
let load_ratio = if total_polls > 0 {
self.non_blocking_polls_count as f64 / total_polls as f64
} else if self.unpolled_count > 0 {
//There are no polls, so we are under load
1.0
} else {
0.0
};
let prev_under_load = self.under_load;
if load_ratio > UNDER_LOAD_RATIO {
self.under_load = DoSOperation::UnderLoad;
//Test feature- if under load goes to normal operation, write to file
#[cfg(feature = "integration_test")]
{
if (prev_under_load == DoSOperation::Normal) {
let sem_name = b"/rp_integration_test_under_dos\0";
// Create or open a semaphore
let sem = unsafe {
libc::sem_open(sem_name.as_ptr() as *const i8, libc::O_CREAT, 0o644, 0)
};
if sem == libc::SEM_FAILED {
panic!("Failed to create or open semaphore");
}
// Post semaphore
unsafe { libc::sem_post(sem) };
} }
} }
} else {
self.under_load = DoSOperation::Normal;
} }
DoSOperation::UnderLoad { last_under_load } => {
if last_under_load.elapsed() > RESET_DURATION { self.blocking_polls_count = 0;
self.under_load = DoSOperation::Normal { blocked_polls: 0 }; self.non_blocking_polls_count = 0;
} self.unpolled_count = 0;
}
} }
// drain all sockets // drain all sockets

View File

@@ -886,7 +886,8 @@ impl CryptoServer {
let mut rx_cookie = [0u8; COOKIE_SIZE]; let mut rx_cookie = [0u8; COOKIE_SIZE];
let mut rx_mac = [0u8; MAC_SIZE]; let mut rx_mac = [0u8; MAC_SIZE];
let mut rx_sid = [0u8; 4]; let mut rx_sid = [0u8; 4];
match rx_buf[0].try_into() { let msg_type = rx_buf[0].try_into();
match msg_type {
Ok(MsgType::InitHello) => { Ok(MsgType::InitHello) => {
let msg_in = rx_buf.envelope::<InitHello<&[u8]>>()?; let msg_in = rx_buf.envelope::<InitHello<&[u8]>>()?;
expected.copy_from_slice( expected.copy_from_slice(

View File

@@ -1,4 +1,4 @@
use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, time::Duration}; use std::{fs, net::UdpSocket, path::PathBuf, process::Stdio, thread::sleep, time::Duration};
const BIN: &str = "rosenpass"; const BIN: &str = "rosenpass";
@@ -119,6 +119,7 @@ fn check_exchange_under_normal() {
} }
// check that we can exchange keys // check that we can exchange keys
#[cfg(feature = "integration_test")]
#[test] #[test]
fn check_exchange_under_dos() { fn check_exchange_under_dos() {
let tmpdir = PathBuf::from(env!("CARGO_TARGET_TMPDIR")).join("exchange-dos"); let tmpdir = PathBuf::from(env!("CARGO_TARGET_TMPDIR")).join("exchange-dos");
@@ -143,6 +144,14 @@ fn check_exchange_under_dos() {
assert!(pub_key_path.is_file()); assert!(pub_key_path.is_file());
} }
//Create a semaphore. The server will unblock this semaphore after it is under load condition.
//There are parameters setup under app_server to remain in load condition once triggered for this test feature.
let sem_name = b"/rp_integration_test_under_dos\0";
let sem = unsafe { libc::sem_open(sem_name.as_ptr() as *const i8, libc::O_CREAT, 0o644, 1) };
unsafe {
libc::sem_wait(sem);
}
// start first process, the server // start first process, the server
let port = find_udp_socket(); let port = find_udp_socket();
let listen_addr = format!("localhost:{port}"); let listen_addr = format!("localhost:{port}");
@@ -158,29 +167,56 @@ fn check_exchange_under_dos() {
//.stdout(Stdio::null()) //.stdout(Stdio::null())
//.stderr(Stdio::null()) //.stderr(Stdio::null())
.spawn() .spawn()
.expect("Failed to start {BIN}"); .expect("Test setup failed- Failed to start {server_bin}");
std::thread::sleep(Duration::from_millis(500)); //Create a UDP socket for DOS sender
//DoS Sender
//Create a UDP socket
let socket = UdpSocket::bind("127.0.0.1:0").expect("couldn't bind to address"); let socket = UdpSocket::bind("127.0.0.1:0").expect("couldn't bind to address");
//Spawn a thread to send DoS packets
let server_addr = listen_addr.clone(); let server_addr = listen_addr.clone();
//Create thread safe atomic bool to stop the DoS attack //Create thread safe atomic bool to stop the DoS attack
let stop_dos = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let stop_dos = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_dos_handle = stop_dos.clone(); let stop_dos_handle = stop_dos.clone();
//Spawn a thread to send DoS packets
let dos_attack = std::thread::spawn(move || { let dos_attack = std::thread::spawn(move || {
while stop_dos.load(std::sync::atomic::Ordering::Relaxed) == false { while stop_dos.load(std::sync::atomic::Ordering::Relaxed) == false {
let buf = [0; 10]; let buf = [0; 10];
socket socket
.send_to(&buf, &server_addr) .send_to(&buf, &server_addr)
.expect("couldn't send data"); .expect("couldn't send data");
sleep(Duration::from_micros(10));
} }
}); });
//Wait till we are under load condition for upto 5 seconds
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
let now = std::time::SystemTime::now();
let timeout_absolute = now + Duration::from_secs(5);
if let Ok(duration) = timeout_absolute.duration_since(std::time::SystemTime::UNIX_EPOCH) {
ts.tv_sec = duration.as_secs() as libc::time_t;
ts.tv_nsec = duration.subsec_nanos() as _;
} else {
panic!("Test setup failed- Failed to calculate timeout for semaphore");
}
let mut failed_wait = false;
if (unsafe { libc::sem_timedwait(sem, &ts) } == -1) {
failed_wait = true;
}
// Close and unlink the semaphore
if unsafe { libc::sem_close(sem) } == -1 {
panic!("Test setup failed- Failed to close semaphore");
}
if unsafe { libc::sem_unlink(sem_name.as_ptr() as *const i8) } == -1 {
panic!("Test setup failed- Failed to unlink semaphore");
}
if failed_wait {
panic!("Failed to wait for semaphore- load condition not reached");
}
// start second process, the client // start second process, the client
let mut client = test_bin::get_test_bin(BIN) let mut client = test_bin::get_test_bin(BIN)
.args(["exchange", "secret-key"]) .args(["exchange", "secret-key"])
@@ -197,8 +233,8 @@ fn check_exchange_under_dos() {
.spawn() .spawn()
.expect("Failed to start {BIN}"); .expect("Failed to start {BIN}");
// give them some time to do the key exchange // give them some time to do the key exchange under load
std::thread::sleep(Duration::from_secs(2)); std::thread::sleep(Duration::from_secs(10));
// time's up, kill the childs // time's up, kill the childs
server.kill().unwrap(); server.kill().unwrap();