feat: Add enable_wg_broker feature using MioBrokerClient

doc: Add documentation for new methods and arguments

fix: Require new psk_broker_spawn flag to use broker without extra parameters, to make all-features cargo test pass

fix: Fix MioBrokerClient buffer size to allow room for length prefix

fix: Fix remaining issue with panic
This commit is contained in:
Katherine Watson
2024-07-24 18:43:01 -07:00
parent 191fb10663
commit 065b0fcc8a
6 changed files with 190 additions and 10 deletions

12
Cargo.lock generated
View File

@@ -449,6 +449,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
[[package]]
name = "command-fds"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f190f3c954f7bca3c6296d0ec561c739bdbe6c7e990294ed168d415f6e1b5b01"
dependencies = [
"nix 0.27.1",
"thiserror",
]
[[package]]
name = "cpufeatures"
version = "0.2.12"
@@ -1788,6 +1798,7 @@ version = "0.2.1"
dependencies = [
"anyhow",
"clap 4.5.15",
"command-fds",
"criterion",
"derive_builder 0.20.0",
"env_logger",
@@ -1808,6 +1819,7 @@ dependencies = [
"rosenpass-to",
"rosenpass-util",
"rosenpass-wireguard-broker",
"rustix",
"serde",
"serial_test",
"stacker",

View File

@@ -53,6 +53,8 @@ zeroize = { workspace = true }
hex-literal = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
heck = { workspace = true, optional = true }
command-fds = { workspace = true }
rustix = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
@@ -66,6 +68,7 @@ procspawn = {workspace = true}
tempfile = { workspace = true }
[features]
enable_wg_broker = ["enable_broker_api"]
enable_broker_api = ["rosenpass-wireguard-broker/enable_broker_api"]
experiment_memfd_secret = []
experiment_libcrux = ["rosenpass-ciphers/experiment_libcrux"]

View File

@@ -7,6 +7,7 @@ use rosenpass_util::file::{LoadValue, LoadValueB64, StoreValue};
use rosenpass_wireguard_broker::brokers::native_unix::{
NativeUnixBroker, NativeUnixBrokerConfigBaseBuilder, NativeUnixBrokerConfigBaseBuilderError,
};
use rosenpass_wireguard_broker::WireguardBrokerMio;
use std::ops::DerefMut;
use std::path::PathBuf;
@@ -16,6 +17,28 @@ use crate::protocol::{SPk, SSk, SymKey};
use super::config;
#[cfg(feature = "enable_wg_broker")]
use {
command_fds::{CommandFdExt, FdMapping},
log::{error, info},
mio::net::UnixStream,
rosenpass_util::fd::claim_fd,
rosenpass_wireguard_broker::brokers::mio_client::MioBrokerClient,
rustix::fd::AsRawFd,
rustix::net::{socketpair, AddressFamily, SocketFlags, SocketType},
std::os::unix::net,
std::process::Command,
std::thread,
};
/// enum representing a choice of interface to a WireGuard broker
#[derive(Debug)]
pub enum BrokerInterface {
Socket(PathBuf),
FileDescriptor(i32),
SocketPair,
}
/// struct holding all CLI arguments for `clap` crate to parse
#[derive(Parser, Debug)]
#[command(author, version, about, long_about)]
@@ -36,6 +59,26 @@ pub struct CliArgs {
#[cfg(feature = "experiment_api")]
api: crate::api::cli::ApiCli,
/// path of the wireguard_psk broker socket to connect to
#[cfg(feature = "enable_wg_broker")]
#[arg(long, group = "psk-broker-specs")]
psk_broker_path: Option<PathBuf>,
/// fd of the wireguard_spk broker socket to connect to
///
/// when this command is called from another process, the other process can open and bind the
/// Unix socket for the psk broker connection to use themselves, passing it to this process --
/// in Rust this can be achieved using the
/// [command-fds](https://docs.rs/command-fds/latest/command_fds/) crate
#[cfg(feature = "enable_wg_broker")]
#[arg(long, group = "psk-broker-specs")]
psk_broker_fd: Option<i32>,
/// spawn a psk broker locally using a socket pair
#[cfg(feature = "enable_wg_broker")]
#[arg(short, long, group = "psk-broker-specs")]
psk_broker_spawn: bool,
#[command(subcommand)]
pub command: CliCommand,
}
@@ -65,6 +108,28 @@ impl CliArgs {
}
None
}
#[cfg(feature = "enable_wg_broker")]
/// returns the broker interface set by CLI args
/// returns `None` if the `enable_wg_broker` feature isn't enabled
pub fn get_broker_interface(&self) -> Option<BrokerInterface> {
if let Some(path_ref) = self.psk_broker_path.as_ref() {
Some(BrokerInterface::Socket(path_ref.to_path_buf()))
} else if let Some(fd) = self.psk_broker_fd {
Some(BrokerInterface::FileDescriptor(fd))
} else if self.psk_broker_spawn {
Some(BrokerInterface::SocketPair)
} else {
None
}
}
#[cfg(not(feature = "enable_wg_broker"))]
/// returns the broker interface set by CLI args
/// returns `None` if the `enable_wg_broker` feature isn't enabled
pub fn get_broker_interface(&self) -> Option<BrokerInterface> {
None
}
}
/// represents a command specified via CLI
@@ -164,7 +229,11 @@ impl CliArgs {
///
/// ## TODO
/// - This method consumes the [`CliCommand`] value. It might be wise to use a reference...
pub fn run(self, test_helpers: Option<AppServerTest>) -> anyhow::Result<()> {
pub fn run(
self,
broker_interface: Option<BrokerInterface>,
test_helpers: Option<AppServerTest>,
) -> anyhow::Result<()> {
use CliCommand::*;
match &self.command {
Man => {
@@ -273,7 +342,7 @@ impl CliArgs {
config.validate()?;
self.apply_to_config(&mut config)?;
Self::event_loop(config, test_helpers)?;
Self::event_loop(config, broker_interface, test_helpers)?;
}
Exchange {
@@ -293,7 +362,7 @@ impl CliArgs {
config.validate()?;
self.apply_to_config(&mut config)?;
Self::event_loop(config, test_helpers)?;
Self::event_loop(config, broker_interface, test_helpers)?;
}
Validate { config_files } => {
@@ -317,6 +386,7 @@ impl CliArgs {
fn event_loop(
config: config::Rosenpass,
broker_interface: Option<BrokerInterface>,
test_helpers: Option<AppServerTest>,
) -> anyhow::Result<()> {
const MAX_PSK_SIZE: usize = 1000;
@@ -336,7 +406,8 @@ impl CliArgs {
config.apply_to_app_server(&mut srv)?;
let broker_store_ptr = srv.register_broker(Box::new(NativeUnixBroker::new()))?;
let broker = Self::create_broker(broker_interface)?;
let broker_store_ptr = srv.register_broker(broker)?;
fn cfg_err_map(e: NativeUnixBrokerConfigBaseBuilderError) -> anyhow::Error {
anyhow::Error::msg(format!("NativeUnixBrokerConfigBaseBuilderError: {:?}", e))
@@ -373,6 +444,90 @@ impl CliArgs {
srv.event_loop()
}
#[cfg(feature = "enable_wg_broker")]
fn create_broker(
broker_interface: Option<BrokerInterface>,
) -> Result<
Box<dyn WireguardBrokerMio<MioError = anyhow::Error, Error = anyhow::Error>>,
anyhow::Error,
> {
if let Some(interface) = broker_interface {
let socket = Self::get_broker_socket(interface)?;
Ok(Box::new(MioBrokerClient::new(socket)))
} else {
Ok(Box::new(NativeUnixBroker::new()))
}
}
#[cfg(not(feature = "enable_wg_broker"))]
fn create_broker(
_broker_interface: Option<BrokerInterface>,
) -> Result<
Box<dyn WireguardBrokerMio<MioError = anyhow::Error, Error = anyhow::Error>>,
anyhow::Error,
> {
Ok(Box::new(NativeUnixBroker::new()))
}
#[cfg(feature = "enable_wg_broker")]
fn get_broker_socket(broker_interface: BrokerInterface) -> Result<UnixStream, anyhow::Error> {
// Connect to the psk broker unix socket if one was specified
// OR OTHERWISE spawn the psk broker and use socketpair(2) to connect with them
match broker_interface {
BrokerInterface::Socket(broker_path) => {
let sock = net::UnixStream::connect(broker_path)?;
sock.set_nonblocking(true)?;
Ok(UnixStream::from_std(sock))
}
BrokerInterface::FileDescriptor(broker_fd) => {
// mio::net::UnixStream doesn't implement From<OwnedFd>, so we have to go through std
let sock = net::UnixStream::from(claim_fd(broker_fd)?);
sock.set_nonblocking(true)?;
Ok(UnixStream::from_std(sock))
}
BrokerInterface::SocketPair => {
// Form a socketpair for communicating to the broker
let (ours, theirs) = socketpair(
AddressFamily::UNIX,
SocketType::STREAM,
SocketFlags::empty(),
None,
)?;
// Setup our end of the socketpair
let ours = net::UnixStream::from(ours);
ours.set_nonblocking(true)?;
// Start the PSK broker
let mut child = Command::new("rosenpass-wireguard-broker-socket-handler")
.args(&["--stream-fd", "3"])
.fd_mappings(vec![FdMapping {
parent_fd: theirs.as_raw_fd(),
child_fd: 3,
}])?
.spawn()?;
// Handle the PSK broker crashing
thread::spawn(move || {
let status = child.wait();
if let Ok(status) = status {
if status.success() {
// Maybe they are doing double forking?
info!("PSK broker exited.");
} else {
error!("PSK broker exited with an error ({status:?})");
}
} else {
error!("Wait on PSK broker process failed ({status:?})");
}
});
Ok(UnixStream::from_std(ours))
}
}
}
}
/// generate secret and public keys, store in files according to the paths passed as arguments

View File

@@ -34,7 +34,8 @@ pub fn main() {
// error!("error dummy");
}
match args.run(None) {
let broker_interface = args.get_broker_interface();
match args.run(broker_interface, None) {
Ok(_) => {}
Err(e) => {
error!("{e:?}");

View File

@@ -108,7 +108,7 @@ fn run_server_client_exchange(
.termination_handler(Some(server_terminate_rx))
.build()
.unwrap();
cli.run(Some(test_helpers)).unwrap();
cli.run(None, Some(test_helpers)).unwrap();
});
let cli = CliArgs::try_parse_from(
@@ -123,7 +123,7 @@ fn run_server_client_exchange(
.termination_handler(Some(client_terminate_rx))
.build()
.unwrap();
cli.run(Some(test_helpers)).unwrap();
cli.run(None, Some(test_helpers)).unwrap();
});
// give them some time to do the key exchange under load

View File

@@ -1,6 +1,8 @@
use anyhow::{bail, ensure};
use mio::Interest;
use rosenpass_util::ord::max_usize;
use std::collections::VecDeque;
use std::dbg;
use std::io::{ErrorKind, Read, Write};
use crate::{SerializedBrokerConfig, WireGuardBroker, WireguardBrokerMio};
@@ -16,7 +18,7 @@ pub struct MioBrokerClient {
}
const LEN_SIZE: usize = 8;
const RECV_BUF_SIZE: usize = RESPONSE_MSG_BUFFER_SIZE;
const RECV_BUF_SIZE: usize = max_usize(LEN_SIZE, RESPONSE_MSG_BUFFER_SIZE);
#[derive(Debug)]
struct MioBrokerClientIo {
@@ -27,7 +29,7 @@ struct MioBrokerClientIo {
recv_buf: [u8; RECV_BUF_SIZE],
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
enum RxState {
//Recieving size with buffer offset
RxSize(usize),
@@ -152,10 +154,17 @@ impl BrokerClientIo for MioBrokerClientIo {
{
let bytes = raw_recv(&self.socket, &mut self.recv_buf[x..y])?;
// If we've received nothing so far, and raw_recv came up empty,
// then let the broker client know nothing came
if self.recv_state == RxState::RxSize(0) && bytes == 0 {
return Ok(None);
}
if x + bytes == y {
return Ok(Some(&self.recv_buf[0..y]));
}
//We didn't recieve everything so let's assume something went wrong
// We didn't recieve everything so let's assume something went wrong
self.recv_state = RxState::RxSize(0);
self.expected_state = RxState::RxSize(LEN_SIZE);
bail!("Invalid state");