From 53e560191f7d76986f17757fe105dad5ae2636c6 Mon Sep 17 00:00:00 2001 From: Karolin Varner Date: Sun, 18 Aug 2024 19:33:56 +0200 Subject: [PATCH] feat(API): Close API connections after error --- rosenpass/src/api/mio/connection.rs | 24 ++++++++----- rosenpass/src/api/mio/manager.rs | 16 +++++++-- .../tests/api-integration-tests-api-setup.rs | 34 +++++++++++++++++-- util/src/mem.rs | 14 ++++++++ 4 files changed, 75 insertions(+), 13 deletions(-) diff --git a/rosenpass/src/api/mio/connection.rs b/rosenpass/src/api/mio/connection.rs index 6753b76..52de31b 100644 --- a/rosenpass/src/api/mio/connection.rs +++ b/rosenpass/src/api/mio/connection.rs @@ -85,6 +85,20 @@ impl MioConnection { api_handler: api_state, }) } + + pub fn shoud_close(&self) -> bool { + let exhausted = self + .buffers + .as_ref() + .map(|b| b.write_buffer.exhausted()) + .unwrap_or(false); + self.invalid_read && exhausted + } + + pub fn close(mut self, app_server: &mut AppServer) -> anyhow::Result<()> { + app_server.mio_poll.registry().deregister(&mut self.io)?; + Ok(()) + } } pub trait MioConnectionContext { @@ -211,12 +225,7 @@ pub trait MioConnectionContext { log::warn!("Received message on API that was too big to fit in our buffers; \ looks like the client is broken. Stopping to process messages of the client.\n\ Error: {e:?}"); - // TODO: We should properly close down the socket in this case, but to do that, - // we need to have the facilities in the Rosenpass IO handling system to close - // open connections. - // Just leaving the API connections dangling for now. - // This should be fixed for non-experimental use of the API. - conn.invalid_read = true; + conn.invalid_read = true; // Closed mio_manager break Ok(None); } @@ -235,8 +244,7 @@ pub trait MioConnectionContext { The connection is broken. Stopping to process messages of the client.\n\ Error: {e:?}" ); - // TODO: Same as above - conn.invalid_read = true; + conn.invalid_read = true; // closed later by mio_manager break Err(e.into()); } }; diff --git a/rosenpass/src/api/mio/manager.rs b/rosenpass/src/api/mio/manager.rs index 57bbc7b..bbf4265 100644 --- a/rosenpass/src/api/mio/manager.rs +++ b/rosenpass/src/api/mio/manager.rs @@ -87,9 +87,19 @@ pub trait MioManagerContext { } fn poll_connections(&mut self) -> anyhow::Result<()> { - for idx in 0..self.mio_manager().connections.len() { - let mut foc: MioConnectionFocus = MioConnectionFocus::new(self, idx); - foc.poll()?; + let mut idx = 0; + while idx < self.mio_manager().connections.len() { + if self.mio_manager().connections[idx].shoud_close() { + let conn = self.mio_manager_mut().connections.remove(idx); + if let Err(e) = conn.close(self.app_server_mut()) { + log::warn!("Error while closing API connection {e:?}"); + }; + continue; + } + + MioConnectionFocus::new(self, idx).poll()?; + + idx += 1; } Ok(()) } diff --git a/rosenpass/tests/api-integration-tests-api-setup.rs b/rosenpass/tests/api-integration-tests-api-setup.rs index 6c6dcc7..b45f130 100644 --- a/rosenpass/tests/api-integration-tests-api-setup.rs +++ b/rosenpass/tests/api-integration-tests-api-setup.rs @@ -1,5 +1,6 @@ use std::{ - io::{BufRead, BufReader}, + borrow::Borrow, + io::{BufRead, BufReader, Write}, os::unix::net::UnixStream, process::Stdio, thread::sleep, @@ -7,6 +8,7 @@ use std::{ }; use anyhow::{bail, Context}; +use command_fds::{CommandFdExt, FdMapping}; use hex_literal::hex; use rosenpass::api::{ self, add_listen_socket_response_status, add_psk_broker_response_status, @@ -15,11 +17,13 @@ use rosenpass::api::{ use rosenpass_util::{ b64::B64Display, file::LoadValueB64, + io::IoErrorKind, length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder}, + mem::MoveExt, mio::WriteWithFileDescriptors, zerocopy::ZerocopySliceExt, }; -use rustix::fd::AsFd; +use rustix::fd::{AsFd, AsRawFd}; use tempfile::TempDir; use zerocopy::AsBytes; @@ -111,8 +115,17 @@ fn api_integration_api_setup() -> anyhow::Result<()> { peer_a.commit()?; peer_b.commit()?; + let (deliberate_fail_api_client, deliberate_fail_api_server) = + std::os::unix::net::UnixStream::pair()?; + let deliberate_fail_child_fd = 3; + // Start peer a let _proc_a = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass")) + .args(["--api-stream-fd", &deliberate_fail_child_fd.to_string()]) + .fd_mappings(vec![FdMapping { + parent_fd: deliberate_fail_api_server.move_here().as_raw_fd(), + child_fd: 3, + }])? .args([ "exchange-config", peer_a.config_file_path.to_str().context("")?, @@ -173,6 +186,23 @@ fn api_integration_api_setup() -> anyhow::Result<()> { ); } + // Deliberately break API connection given via FD; this checks that the + // API connections are closed when invalid data is received and it also + // implicitly checks that other connections are unaffected + { + use std::io::ErrorKind as K; + let client = deliberate_fail_api_client; + let err = loop { + if let Err(e) = client.borrow().write(&[0xffu8; 16]) { + break e; + } + }; + assert!(matches!( + err.io_error_kind(), + K::ConnectionReset | K::BrokenPipe + )); + } + // Send SupplyKeypairRequest { use rustix::fs::{open, Mode, OFlags}; diff --git a/util/src/mem.rs b/util/src/mem.rs index 2cc5785..e6e22cb 100644 --- a/util/src/mem.rs +++ b/util/src/mem.rs @@ -136,3 +136,17 @@ impl SwapWithDefaultExt for T { self.swap_with(Self::default()) } } + +pub trait MoveExt { + /// Deliberately move the value + /// + /// Usually employed to enforce an object being + /// dropped after use. + fn move_here(self) -> Self; +} + +impl MoveExt for T { + fn move_here(self) -> Self { + self + } +}