feat(API): Close API connections after error

This commit is contained in:
Karolin Varner
2024-08-18 19:33:56 +02:00
parent 93cd266c68
commit 53e560191f
4 changed files with 75 additions and 13 deletions

View File

@@ -85,6 +85,20 @@ impl MioConnection {
api_handler: api_state, api_handler: api_state,
}) })
} }
pub fn shoud_close(&self) -> bool {
let exhausted = self
.buffers
.as_ref()
.map(|b| b.write_buffer.exhausted())
.unwrap_or(false);
self.invalid_read && exhausted
}
pub fn close(mut self, app_server: &mut AppServer) -> anyhow::Result<()> {
app_server.mio_poll.registry().deregister(&mut self.io)?;
Ok(())
}
} }
pub trait MioConnectionContext { pub trait MioConnectionContext {
@@ -211,12 +225,7 @@ pub trait MioConnectionContext {
log::warn!("Received message on API that was too big to fit in our buffers; \ log::warn!("Received message on API that was too big to fit in our buffers; \
looks like the client is broken. Stopping to process messages of the client.\n\ looks like the client is broken. Stopping to process messages of the client.\n\
Error: {e:?}"); Error: {e:?}");
// TODO: We should properly close down the socket in this case, but to do that, conn.invalid_read = true; // Closed mio_manager
// we need to have the facilities in the Rosenpass IO handling system to close
// open connections.
// Just leaving the API connections dangling for now.
// This should be fixed for non-experimental use of the API.
conn.invalid_read = true;
break Ok(None); break Ok(None);
} }
@@ -235,8 +244,7 @@ pub trait MioConnectionContext {
The connection is broken. Stopping to process messages of the client.\n\ The connection is broken. Stopping to process messages of the client.\n\
Error: {e:?}" Error: {e:?}"
); );
// TODO: Same as above conn.invalid_read = true; // closed later by mio_manager
conn.invalid_read = true;
break Err(e.into()); break Err(e.into());
} }
}; };

View File

@@ -87,9 +87,19 @@ pub trait MioManagerContext {
} }
fn poll_connections(&mut self) -> anyhow::Result<()> { fn poll_connections(&mut self) -> anyhow::Result<()> {
for idx in 0..self.mio_manager().connections.len() { let mut idx = 0;
let mut foc: MioConnectionFocus<Self> = MioConnectionFocus::new(self, idx); while idx < self.mio_manager().connections.len() {
foc.poll()?; if self.mio_manager().connections[idx].shoud_close() {
let conn = self.mio_manager_mut().connections.remove(idx);
if let Err(e) = conn.close(self.app_server_mut()) {
log::warn!("Error while closing API connection {e:?}");
};
continue;
}
MioConnectionFocus::new(self, idx).poll()?;
idx += 1;
} }
Ok(()) Ok(())
} }

View File

@@ -1,5 +1,6 @@
use std::{ use std::{
io::{BufRead, BufReader}, borrow::Borrow,
io::{BufRead, BufReader, Write},
os::unix::net::UnixStream, os::unix::net::UnixStream,
process::Stdio, process::Stdio,
thread::sleep, thread::sleep,
@@ -7,6 +8,7 @@ use std::{
}; };
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use command_fds::{CommandFdExt, FdMapping};
use hex_literal::hex; use hex_literal::hex;
use rosenpass::api::{ use rosenpass::api::{
self, add_listen_socket_response_status, add_psk_broker_response_status, self, add_listen_socket_response_status, add_psk_broker_response_status,
@@ -15,11 +17,13 @@ use rosenpass::api::{
use rosenpass_util::{ use rosenpass_util::{
b64::B64Display, b64::B64Display,
file::LoadValueB64, file::LoadValueB64,
io::IoErrorKind,
length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder}, length_prefix_encoding::{decoder::LengthPrefixDecoder, encoder::LengthPrefixEncoder},
mem::MoveExt,
mio::WriteWithFileDescriptors, mio::WriteWithFileDescriptors,
zerocopy::ZerocopySliceExt, zerocopy::ZerocopySliceExt,
}; };
use rustix::fd::AsFd; use rustix::fd::{AsFd, AsRawFd};
use tempfile::TempDir; use tempfile::TempDir;
use zerocopy::AsBytes; use zerocopy::AsBytes;
@@ -111,8 +115,17 @@ fn api_integration_api_setup() -> anyhow::Result<()> {
peer_a.commit()?; peer_a.commit()?;
peer_b.commit()?; peer_b.commit()?;
let (deliberate_fail_api_client, deliberate_fail_api_server) =
std::os::unix::net::UnixStream::pair()?;
let deliberate_fail_child_fd = 3;
// Start peer a // Start peer a
let _proc_a = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass")) let _proc_a = std::process::Command::new(env!("CARGO_BIN_EXE_rosenpass"))
.args(["--api-stream-fd", &deliberate_fail_child_fd.to_string()])
.fd_mappings(vec![FdMapping {
parent_fd: deliberate_fail_api_server.move_here().as_raw_fd(),
child_fd: 3,
}])?
.args([ .args([
"exchange-config", "exchange-config",
peer_a.config_file_path.to_str().context("")?, peer_a.config_file_path.to_str().context("")?,
@@ -173,6 +186,23 @@ fn api_integration_api_setup() -> anyhow::Result<()> {
); );
} }
// Deliberately break API connection given via FD; this checks that the
// API connections are closed when invalid data is received and it also
// implicitly checks that other connections are unaffected
{
use std::io::ErrorKind as K;
let client = deliberate_fail_api_client;
let err = loop {
if let Err(e) = client.borrow().write(&[0xffu8; 16]) {
break e;
}
};
assert!(matches!(
err.io_error_kind(),
K::ConnectionReset | K::BrokenPipe
));
}
// Send SupplyKeypairRequest // Send SupplyKeypairRequest
{ {
use rustix::fs::{open, Mode, OFlags}; use rustix::fs::{open, Mode, OFlags};

View File

@@ -136,3 +136,17 @@ impl<T: Default> SwapWithDefaultExt for T {
self.swap_with(Self::default()) self.swap_with(Self::default())
} }
} }
pub trait MoveExt {
/// Deliberately move the value
///
/// Usually employed to enforce an object being
/// dropped after use.
fn move_here(self) -> Self;
}
impl<T: Sized> MoveExt for T {
fn move_here(self) -> Self {
self
}
}