From 258efe408cba74745dabe41a730e04b5a06b0b2d Mon Sep 17 00:00:00 2001 From: Karolin Varner Date: Thu, 15 Aug 2024 22:14:48 +0200 Subject: [PATCH] fix: PSK broker integration did not work This commit resolves multiple issues with the PSK broker integration. - The manual testing procedure never actually utilized the brokers due to the use of the outfile option, this led to issues with the broker being hidden. - The manual testing procedure omitted checking whether a PSK was actually sent to WireGuard entirely. This was fixed by writing an entirely new manual integration testing shell-script that can serve as a blueprint for future integration tests. - Many parts of the PSK broker code did not report (log) errors accurately; added error logging - BrokerServer set message.payload.return_code to the msg_type value, this led to crashes - The PSK broker commands all omitted to set the memfd policy, this led to immediate crashes once secrets where actually allocated - The MioBrokerClient IO state machine was broken and the design was too obtuse to debug. The state machine returned the length prefix as a message instead of actually interpreting it as a state machine. Seems the code was integrated but never actually tested. This was fixed by rewriting the entire state machine code using the new LengthPrefixEncoder/Decoder facilities. A write-buffer that was not being flushed is now handled by flushing the buffer in blocking-io mode. --- Cargo.lock | 2 + Cargo.toml | 1 + doc/broker/broker_testing.md | 49 ---- manual_tests/psk_broker/peer_a.rp.config | 13 + manual_tests/psk_broker/peer_b.rp.config | 14 ++ manual_tests/psk_broker/run_test.sh | 215 ++++++++++++++++ rosenpass/Cargo.toml | 2 +- rosenpass/src/cli.rs | 2 +- wireguard-broker/Cargo.toml | 5 +- wireguard-broker/src/api/config.rs | 2 +- wireguard-broker/src/api/server.rs | 9 +- wireguard-broker/src/bin/priviledged.rs | 8 + wireguard-broker/src/bin/socket_handler.rs | 8 + wireguard-broker/src/brokers/mio_client.rs | 277 +++++++++------------ wireguard-broker/src/brokers/netlink.rs | 1 + 15 files changed, 398 insertions(+), 210 deletions(-) delete mode 100644 doc/broker/broker_testing.md create mode 100644 manual_tests/psk_broker/peer_a.rp.config create mode 100644 manual_tests/psk_broker/peer_b.rp.config create mode 100755 manual_tests/psk_broker/run_test.sh diff --git a/Cargo.lock b/Cargo.lock index 58bdb0e..628249f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1935,6 +1935,7 @@ dependencies = [ "clap 4.5.15", "derive_builder 0.20.0", "env_logger", + "libc", "log", "mio", "postcard", @@ -1943,6 +1944,7 @@ dependencies = [ "rosenpass-secret-memory", "rosenpass-to", "rosenpass-util", + "rustix", "thiserror", "tokio", "wireguard-uapi", diff --git a/Cargo.toml b/Cargo.toml index a723949..35acc26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ libcrux = { version = "0.0.2-pre.2" } hex-literal = { version = "0.4.1" } hex = { version = "0.4.3" } heck = { version = "0.5.0" } +libc = { version = "0.2" } #Dev dependencies serial_test = "3.1.1" diff --git a/doc/broker/broker_testing.md b/doc/broker/broker_testing.md deleted file mode 100644 index 53f87b9..0000000 --- a/doc/broker/broker_testing.md +++ /dev/null @@ -1,49 +0,0 @@ -## Experimental Broker Feature Testing - -In order to test the experimental broker feature, a few manual steps are needed. These will soon be replaced with a revision to the integration test to allow it to optionally use the broker feature, but for the moment manual testing is the only option. - -To manually test the broker feature, start by building Rosenpass with the broker feature: - -```bash -cd rosenpass -cargo build --features=experimental_broker_api -``` - -Next, generate keys for two parties using the example Rosenpass configuration files - -```bash -PATH="$PWD/target/debug:$PATH" rosenpass gen-keys config-examples/peer-a-config.toml -PATH="$PWD/target/debug:$PATH" rosenpass gen-keys config-examples/peer-b-config.toml -``` - -Now, open a second terminal and run the following in one (not using the broker): - -```bash -PATH="$PWD/target/debug:$PATH" rosenpass exchange-config config-examples/peer-a-config.toml -``` - -and the following in the other (spawning a broker and communicating with it via socketpair(2)): - -```bash -cd rosenpass -PATH="$PWD/target/debug:$PATH" rosenpass --psk_broker_spawn exchange-config config-examples/peer-a-config.toml -``` - -You should see the two parties exchange keys, and can view the shared PSK via `wg show`. - -In order to test using a Unix socket at a provided path instead, replace the above command with this: - -```bash -PATH="$PWD/target/debug:$PATH" rosenpass --psk_broker_path broker.sock exchange-config config-examples/peer-a-config.toml -``` - -Then, in a third terminal, run the following - -```bash -cd rosenpass -PATH="$PWD/target/debug:$PATH" rosenpass-wireguard-broker-socket-handler --listen-path broker.sock -``` - -You should see the two parties exchange keys. - -The `--psk_broker_fd` feature can be similarly tested, but would require a separate script providing an open file descriptor to do so. diff --git a/manual_tests/psk_broker/peer_a.rp.config b/manual_tests/psk_broker/peer_a.rp.config new file mode 100644 index 0000000..abc7478 --- /dev/null +++ b/manual_tests/psk_broker/peer_a.rp.config @@ -0,0 +1,13 @@ +secret_key = "peer_a.rp.sk" +public_key = "peer_a.rp.pk" +listen = ["[::1]:46127"] +verbosity = "Verbose" + +[api] +listen_path = [] +listen_fd = [] +stream_fd = [] + +[[peers]] +public_key = "peer_b.rp.pk" +device = "rpPskBrkTestA" diff --git a/manual_tests/psk_broker/peer_b.rp.config b/manual_tests/psk_broker/peer_b.rp.config new file mode 100644 index 0000000..916930f --- /dev/null +++ b/manual_tests/psk_broker/peer_b.rp.config @@ -0,0 +1,14 @@ +secret_key = "peer_b.rp.sk" +public_key = "peer_b.rp.pk" +listen = [] +verbosity = "Verbose" + +[api] +listen_path = [] +listen_fd = [] +stream_fd = [] + +[[peers]] +public_key = "peer_a.rp.pk" +endpoint = "[::1]:46127" +device = "rpPskBrkTestB" diff --git a/manual_tests/psk_broker/run_test.sh b/manual_tests/psk_broker/run_test.sh new file mode 100755 index 0000000..bad1a08 --- /dev/null +++ b/manual_tests/psk_broker/run_test.sh @@ -0,0 +1,215 @@ +#! /bin/bash + +set -e -o pipefail + +enquote() { + while (( "$#" > 1)); do + printf "%q " "$1" + shift + done + if (("$#" > 0)); then + printf "%q" "$1" + fi +} + +CLEANUP_HOOKS=() +hook_cleanup() { + local hook + set +e +o pipefail + for hook in "${CLEANUP_HOOKS[@]}"; do + eval "${hook}" + done +} + +cleanup() { + CLEANUP_HOOKS=("$(enquote exc_with_ctx cleanup "$@")" "${CLEANUP_HOOKS[@]}") +} + +cleanup_eval() { + cleanup eval "$*" +} + +stderr() { + echo >&2 "$@" +} + +log() { + local level; level="$1"; shift || fatal "USAGE: log LVL MESSAGE.." + stderr "[${level}]" "$@" +} + +info() { + log "INFO" "$@" +} + +debug() { + log "DEBUG" "$@" +} + +fatal() { + log "FATAL" "$@" + exit 1 +} + +assert() { + local msg; msg="$1"; shift || fatal "USAGE: assert_cmd MESSAGE COMMAND.." + "$@" || fatal "${msg}" +} + +abs_dir() { + local dir; dir="$1"; shift || fatal "USAGE: abs_dir DIR" + ( + cd "${dir}" + pwd -P + ) +} + +exc_with_ctx() { + local ctx; ctx="$1"; shift || fatal "USAGE: exc_with_ctx CONTEXT COMMAND.." + if [[ -z "${ctx}" ]]; then + info '$' "$@" + else + info "${ctx}\$" "$@" + fi + + "$@" +} + +exc() { + exc_with_ctx "" "$@" +} + +exc_eval() { + exc eval "$*" +} + +exc_eval_with_ctx() { + local ctx; ctx="$1"; shift || fatal "USAGE: exc_eval_with_ctx CONTEXT EVAL_COMMAND.." + exc_with_ctx "eval:${ctx}" "$*" +} + +exc_as_user() { + exc sudo -u "${SUDO_USER}" "$@" +} + +exc_eval_as_user() { + exc_as_user bash -c "$*" +} + +fork_eval_as_user() { + exc sudo -u "${SUDO_USER}" bash -c "$*" & + local pid; pid="$!" + cleanup wait "${pid}" + cleanup pkill -2 -P "${pid}" # Reverse ordering +} + +info_success() { + stderr + stderr + if [[ "${SUCCESS}" = 1 ]]; then + stderr " Test was a success!" + else + stderr " !!! TEST WAS A FAILURE!!!" + fi + stderr +} + +main() { + assert "Use as root with sudo" [ "$(id -u)" -eq 0 ] + assert "Use as root with sudo" [ -n "${SUDO_UID}" ] + assert "SUDO_UID is 0; refusing to build as root" [ "${SUDO_UID}" -ne 0 ] + + cleanup info_success + + trap hook_cleanup EXIT + + SCRIPT="$0" + CFG_TEMPLATE_DIR="$(abs_dir "$(dirname "${SCRIPT}")")" + REPO="$(abs_dir "${CFG_TEMPLATE_DIR}/../..")" + BINS="${REPO}/target/debug" + + # Create temp dir + TMP_DIR="/tmp/rosenpass-psk-broker-test-$(date +%s)-$(uuidgen)" + cleanup rm -rf "${TMP_DIR}" + exc_as_user mkdir -p "${TMP_DIR}" + + # Copy config + CFG_DIR="${TMP_DIR}/cfg" + exc_as_user cp -R "${CFG_TEMPLATE_DIR}" "${CFG_DIR}" + + exc umask 077 + + exc cd "${REPO}" + local build_cmd; build_cmd=(cargo build --workspace --color=always --all-features --bins --profile dev) + if test -e "${BINS}/rosenpass-wireguard-broker-privileged" -a -e "${BINS}/rosenpass"; then + info "Found the binaries rosenpass-wireguard-broker-privileged and rosenpass." \ + "Run following commands as a regular user to recompile the binaries with the right options" \ + "in case of an error:" '$' "${build_cmd[@]}" + else + exc_as_user "${build_cmd[@]}" + fi + exc sudo setcap CAP_NET_ADMIN=+eip "${BINS}/rosenpass-wireguard-broker-privileged" + + exc cd "${CFG_DIR}" + exc_eval_as_user "wg genkey > peer_a.wg.sk" + exc_eval_as_user "wg pubkey < peer_a.wg.sk > peer_a.wg.pk" + exc_eval_as_user "wg genkey > peer_b.wg.sk" + exc_eval_as_user "wg pubkey < peer_b.wg.sk > peer_b.wg.pk" + exc_eval_as_user "wg genpsk > peer_a_invalid.psk" + exc_eval_as_user "wg genpsk > peer_b_invalid.psk" + exc_eval_as_user "echo $(enquote "peer = \"$(cat peer_b.wg.pk)\"") >> peer_a.rp.config" + exc_eval_as_user "echo $(enquote "peer = \"$(cat peer_a.wg.pk)\"") >> peer_b.rp.config" + exc_as_user "${BINS}"/rosenpass gen-keys peer_a.rp.config + exc_as_user "${BINS}"/rosenpass gen-keys peer_b.rp.config + + cleanup ip l del dev rpPskBrkTestA + cleanup ip l del dev rpPskBrkTestB + exc ip l add dev rpPskBrkTestA type wireguard + exc ip l add dev rpPskBrkTestB type wireguard + + exc wg set rpPskBrkTestA \ + listen-port 46125 \ + private-key peer_a.wg.sk \ + peer "$(cat peer_b.wg.pk)" \ + endpoint 'localhost:46126' \ + preshared-key peer_a_invalid.psk \ + allowed-ips fe80::2/64 + exc wg set rpPskBrkTestB \ + listen-port 46126 \ + private-key peer_b.wg.sk \ + peer "$(cat peer_a.wg.pk)" \ + endpoint 'localhost:46125' \ + preshared-key peer_b_invalid.psk \ + allowed-ips fe80::1/64 + + exc ip l set rpPskBrkTestA up + exc ip l set rpPskBrkTestB up + + exc ip a add fe80::1/64 dev rpPskBrkTestA + exc ip a add fe80::2/64 dev rpPskBrkTestB + + fork_eval_as_user "\ + RUST_LOG='info' \ + PATH=$(enquote "${REPO}/target/debug:${PATH}") \ + $(enquote "${BINS}/rosenpass") --psk-broker-spawn \ + exchange-config peer_a.rp.config" + fork_eval_as_user "\ + RUST_LOG='info' \ + PATH=$(enquote "${REPO}/target/debug:${PATH}") \ + $(enquote "${BINS}/rosenpass-wireguard-broker-socket-handler") \ + --listen-path broker.sock" + fork_eval_as_user "\ + RUST_LOG='info' \ + PATH=$(enquote "$PWD/target/debug:${PATH}") \ + $(enquote "${BINS}/rosenpass") --psk-broker-path broker.sock \ + exchange-config peer_b.rp.config" + + exc_as_user ping -c 2 -w 10 fe80::1%rpPskBrkTestA + exc_as_user ping -c 2 -w 10 fe80::2%rpPskBrkTestB + exc_as_user ping -c 2 -w 10 fe80::2%rpPskBrkTestA + exc_as_user ping -c 2 -w 10 fe80::1%rpPskBrkTestB + + SUCCESS=1 +} + +main "$@" diff --git a/rosenpass/Cargo.toml b/rosenpass/Cargo.toml index 8d2669f..b8b386a 100644 --- a/rosenpass/Cargo.toml +++ b/rosenpass/Cargo.toml @@ -68,7 +68,7 @@ procspawn = {workspace = true} tempfile = { workspace = true } [features] -experiment_memfd_secret = [] +experiment_memfd_secret = ["rosenpass-wireguard-broker/experiment_memfd_secret"] experiment_broker_api = ["rosenpass-wireguard-broker/experimental_broker_api", "command-fds"] experiment_libcrux = ["rosenpass-ciphers/experiment_libcrux"] experiment_api = ["hex-literal"] diff --git a/rosenpass/src/cli.rs b/rosenpass/src/cli.rs index 2353eeb..39ee896 100644 --- a/rosenpass/src/cli.rs +++ b/rosenpass/src/cli.rs @@ -496,7 +496,7 @@ impl CliArgs { // Start the PSK broker let mut child = Command::new("rosenpass-wireguard-broker-socket-handler") - .args(&["--stream-fd", "3"]) + .args(["--stream-fd", "3"]) .fd_mappings(vec![FdMapping { parent_fd: theirs.as_raw_fd(), child_fd: 3, diff --git a/wireguard-broker/Cargo.toml b/wireguard-broker/Cargo.toml index 0669d46..824980e 100644 --- a/wireguard-broker/Cargo.toml +++ b/wireguard-broker/Cargo.toml @@ -26,6 +26,8 @@ env_logger = { workspace = true } log = { workspace = true } derive_builder = {workspace = true} postcard = {workspace = true} +rustix = { worspace = true, optional = true } +libc = { worspace = true, optional = true } # Mio broker client mio = { workspace = true } @@ -36,7 +38,8 @@ rand = {workspace = true} procspawn = {workspace = true} [features] -experimental_broker_api = [] +experimental_broker_api = ["rustix", "libc"] +experiment_memfd_secret = [] [[bin]] name = "rosenpass-wireguard-broker-privileged" diff --git a/wireguard-broker/src/api/config.rs b/wireguard-broker/src/api/config.rs index c487d7f..97a36f4 100644 --- a/wireguard-broker/src/api/config.rs +++ b/wireguard-broker/src/api/config.rs @@ -2,7 +2,7 @@ use crate::{SerializedBrokerConfig, WG_KEY_LEN, WG_PEER_LEN}; use derive_builder::Builder; use rosenpass_secret_memory::{Public, Secret}; -#[derive(Builder)] +#[derive(Builder, Debug)] #[builder(pattern = "mutable")] //TODO: Use generics for iface, add additional params pub struct NetworkBrokerConfig<'a> { diff --git a/wireguard-broker/src/api/server.rs b/wireguard-broker/src/api/server.rs index 1eaf255..da3c2fe 100644 --- a/wireguard-broker/src/api/server.rs +++ b/wireguard-broker/src/api/server.rs @@ -36,6 +36,7 @@ impl BrokerServer where Inner: WireGuardBroker, msgs::SetPskError: From, + Err: std::fmt::Debug, { pub fn new(inner: Inner) -> Self { Self { inner } @@ -56,9 +57,9 @@ where .ok_or(BrokerServerError::InvalidMessage)?; let mut res = zerocopy::Ref::<&mut [u8], Envelope>::new(res) .ok_or(BrokerServerError::InvalidMessage)?; - - res.payload.return_code = msgs::MsgType::SetPsk as u8; + res.msg_type = msgs::MsgType::SetPsk as u8; self.handle_set_psk(&req.payload, &mut res.payload)?; + Ok(res.bytes().len()) } @@ -83,6 +84,10 @@ where .build() .unwrap(); let r: Result<(), Err> = self.inner.borrow_mut().set_psk(config.into()); + if let Err(e) = &r { + eprintln!("Error setting PSK: {e:?}"); // TODO: Use rust log + } + let r: msgs::SetPskResult = r.map_err(|e| e.into()); let r: msgs::SetPskResponseReturnCode = r.into(); res.return_code = r as u8; diff --git a/wireguard-broker/src/bin/priviledged.rs b/wireguard-broker/src/bin/priviledged.rs index 2086cbe..7a42d21 100644 --- a/wireguard-broker/src/bin/priviledged.rs +++ b/wireguard-broker/src/bin/priviledged.rs @@ -27,6 +27,14 @@ pub mod linux { } pub fn main() -> Result<(), BrokerAppError> { + { + use rosenpass_secret_memory as SM; + #[cfg(feature = "experiment_memfd_secret")] + SM::secret_policy_try_use_memfd_secrets(); + #[cfg(not(feature = "experiment_memfd_secret"))] + SM::secret_policy_use_only_malloc_secrets(); + } + let mut broker = BrokerServer::new(wg::NetlinkWireGuardBroker::new()?); let mut stdin = stdin().lock(); diff --git a/wireguard-broker/src/bin/socket_handler.rs b/wireguard-broker/src/bin/socket_handler.rs index 9638ba5..9f8c4e7 100644 --- a/wireguard-broker/src/bin/socket_handler.rs +++ b/wireguard-broker/src/bin/socket_handler.rs @@ -148,6 +148,14 @@ async fn listen_for_clients(queue: mpsc::Sender, sock: UnixListen async fn on_accept(queue: mpsc::Sender, mut stream: UnixStream) -> Result<()> { let mut req_buf = Vec::new(); + { + use rosenpass_secret_memory as SM; + #[cfg(feature = "experiment_memfd_secret")] + SM::secret_policy_try_use_memfd_secrets(); + #[cfg(not(feature = "experiment_memfd_secret"))] + SM::secret_policy_use_only_malloc_secrets(); + } + loop { stream.readable().await?; diff --git a/wireguard-broker/src/brokers/mio_client.rs b/wireguard-broker/src/brokers/mio_client.rs index 0f1633b..8af7fcf 100644 --- a/wireguard-broker/src/brokers/mio_client.rs +++ b/wireguard-broker/src/brokers/mio_client.rs @@ -1,59 +1,79 @@ -use anyhow::{bail, ensure}; +use anyhow::{bail, Context}; use mio::Interest; -use rosenpass_util::ord::max_usize; -use std::collections::VecDeque; -use std::io::{ErrorKind, Read, Write}; - -use crate::{SerializedBrokerConfig, WireGuardBroker, WireguardBrokerMio}; +use rosenpass_secret_memory::Secret; +use rosenpass_to::{ops::copy_slice_least_src, To}; +use rosenpass_util::io::{IoResultKindHintExt, TryIoResultKindHintExt}; +use rosenpass_util::length_prefix_encoding::decoder::LengthPrefixDecoder; +use rosenpass_util::length_prefix_encoding::encoder::LengthPrefixEncoder; +use rustix::fd::AsFd; +use std::borrow::{Borrow, BorrowMut}; use crate::api::client::{ BrokerClient, BrokerClientIo, BrokerClientPollResponseError, BrokerClientSetPskError, }; -use crate::api::msgs::{self, RESPONSE_MSG_BUFFER_SIZE}; +use crate::{SerializedBrokerConfig, WireGuardBroker, WireguardBrokerMio}; #[derive(Debug)] pub struct MioBrokerClient { inner: BrokerClient, } -const LEN_SIZE: usize = 8; -const RECV_BUF_SIZE: usize = max_usize(LEN_SIZE, RESPONSE_MSG_BUFFER_SIZE); +#[derive(Debug)] +struct SecretBuffer(pub Secret); + +impl SecretBuffer { + fn new() -> Self { + Self(Secret::zero()) + } +} + +impl Borrow<[u8]> for SecretBuffer { + fn borrow(&self) -> &[u8] { + self.0.secret() + } +} + +impl BorrowMut<[u8]> for SecretBuffer { + fn borrow_mut(&mut self) -> &mut [u8] { + self.0.secret_mut() + } +} + +type ReadBuffer = LengthPrefixDecoder>; +type WriteBuffer = LengthPrefixEncoder>; #[derive(Debug)] struct MioBrokerClientIo { socket: mio::net::UnixStream, - send_buf: VecDeque, - recv_state: RxState, - expected_state: RxState, - recv_buf: [u8; RECV_BUF_SIZE], -} - -#[derive(Debug, Clone, Copy, PartialEq)] -enum RxState { - //Recieving size with buffer offset - RxSize(usize), - RxBuffer(usize), + read_buffer: ReadBuffer, + write_buffer: WriteBuffer, } impl MioBrokerClient { pub fn new(socket: mio::net::UnixStream) -> Self { + let read_buffer = LengthPrefixDecoder::new(SecretBuffer::new()); + let write_buffer = LengthPrefixEncoder::from_buffer(SecretBuffer::new()); let io = MioBrokerClientIo { socket, - send_buf: VecDeque::new(), - recv_state: RxState::RxSize(0), - recv_buf: [0u8; RECV_BUF_SIZE], - expected_state: RxState::RxSize(LEN_SIZE), + read_buffer, + write_buffer, }; let inner = BrokerClient::new(io); Self { inner } } - fn poll(&mut self) -> anyhow::Result> { + fn poll(&mut self) -> anyhow::Result<()> { self.inner.io_mut().flush()?; // This sucks - match self.inner.poll_response() { - Ok(res) => Ok(res), + let res = self.inner.poll_response(); + match res { + Ok(None) => Ok(()), + Ok(Some(Ok(()))) => Ok(()), + Ok(Some(Err(e))) => { + log::warn!("Error from PSK broker: {e:?}"); + Ok(()) + } Err(BrokerClientPollResponseError::IoError(e)) => Err(e), Err(BrokerClientPollResponseError::InvalidMessage) => bail!("Invalid message"), } @@ -108,154 +128,101 @@ impl BrokerClientIo for MioBrokerClientIo { type RecvError = anyhow::Error; fn send_msg(&mut self, buf: &[u8]) -> Result<(), Self::SendError> { - self.flush()?; - self.send_or_buffer(&(buf.len() as u64).to_le_bytes())?; - self.send_or_buffer(buf)?; + // Clear write buffer (blocking write) + self.flush_blocking()?; + assert!(self.write_buffer.exhausted(), "flush_blocking() should have put the write buffer in exhausted state. Developer error!"); + + // Emplace new message in write buffer + copy_slice_least_src(buf).to(self.write_buffer.buffer_bytes_mut()); + self.write_buffer + .restart_write_with_new_message(buf.len())?; + + // Give the write buffer a chance to clear self.flush()?; Ok(()) } fn recv_msg(&mut self) -> Result, Self::RecvError> { + use std::io::ErrorKind as K; loop { - match (self.recv_state, self.expected_state) { - //Stale Buffer state or recieved everything - (RxState::RxSize(x), RxState::RxSize(y)) - | (RxState::RxBuffer(x), RxState::RxBuffer(y)) - if x == y => - { - match self.recv_state { - RxState::RxSize(s) => { - let len: &[u8; LEN_SIZE] = self.recv_buf[0..s].try_into().unwrap(); - let len: usize = u64::from_le_bytes(*len) as usize; + match self + .read_buffer + .read_from_stdio(&self.socket) + .try_io_err_kind_hint() + { + Ok(_) => {} // Moved down in the loop + Err((_, Some(K::WouldBlock))) => break Ok(None), + Err((_, Some(K::Interrupted))) => continue, + Err((e, _)) => break Err(e)?, + } - ensure!( - len <= msgs::RESPONSE_MSG_BUFFER_SIZE, - "Oversized buffer ({len}) in psk buffer response." - ); - - self.recv_state = RxState::RxBuffer(0); - self.expected_state = RxState::RxBuffer(len); - continue; - } - RxState::RxBuffer(s) => { - self.recv_state = RxState::RxSize(0); - self.expected_state = RxState::RxSize(LEN_SIZE); - return Ok(Some(&self.recv_buf[0..s])); - } - } - } - - //Recieve if x < y - (RxState::RxSize(x), RxState::RxSize(y)) - | (RxState::RxBuffer(x), RxState::RxBuffer(y)) - if x < y => - { - let bytes = raw_recv(&self.socket, &mut self.recv_buf[x..y])?; - - // If we've received nothing so far, and raw_recv came up empty, - // then let the broker client know nothing came - if self.recv_state == RxState::RxSize(0) && bytes == 0 { - return Ok(None); - } - - if x + bytes == y { - return Ok(Some(&self.recv_buf[0..y])); - } - - // We didn't recieve everything so let's assume something went wrong - self.recv_state = RxState::RxSize(0); - self.expected_state = RxState::RxSize(LEN_SIZE); - bail!("Invalid state"); - } - _ => { - //Reset states - self.recv_state = RxState::RxSize(0); - self.expected_state = RxState::RxSize(LEN_SIZE); - bail!("Invalid state"); - } - }; + // OK case moved here to appease borrow checker + break Ok(self.read_buffer.message()?); } } } impl MioBrokerClientIo { - fn flush(&mut self) -> anyhow::Result<()> { - let (fst, snd) = self.send_buf.as_slices(); + fn flush_blocking(&mut self) -> anyhow::Result<()> { + self.flush()?; + if self.write_buffer.exhausted() { + return Ok(()); + } - let (written, res) = match raw_send(&self.socket, fst) { - Ok(w1) if w1 >= fst.len() => match raw_send(&self.socket, snd) { - Ok(w2) => (w1 + w2, Ok(())), - Err(e) => (w1, Err(e)), - }, - Ok(w1) => (w1, Ok(())), - Err(e) => (0, Err(e)), + log::warn!("Could not flush PSK broker write buffer in non-blocking mode. Flushing in blocking mode!"); + use rustix::io::{fcntl_getfd, fcntl_setfd, FdFlags}; + + // Build O_NONBLOCK + let o_nonblock = { + let v = libc::O_NONBLOCK; + let v = v.try_into().context( + "Could not cast O_NONBLOCK (`{v}`) from libc int (i32?) to rustix int (u32?)", + )?; + FdFlags::from_bits(v).context( + "Could not cast O_NONBLOCK (`{v}`) from rustix int to rustix::io::FdFlags", + )? }; - self.send_buf.drain(..written); + // Determine previous and new file descriptor flags + let flags_orig = fcntl_getfd(self.socket.as_fd())?; + let mut flags_blocking = flags_orig; + flags_blocking.insert(o_nonblock); - self.socket.try_io(|| (&self.socket).flush())?; + // Set file descriptor flags + fcntl_setfd(self.socket.as_fd(), flags_blocking)?; - res + // Blocking write + let res = loop { + if self.write_buffer.exhausted() { + break Ok(()); + } + + match self.flush() { + Ok(_) => {} + Err(e) => break Err(e), + } + }; + + // Restore file descriptor flags + fcntl_setfd(self.socket.as_fd(), flags_orig)?; + + Ok(res?) } - fn send_or_buffer(&mut self, buf: &[u8]) -> anyhow::Result<()> { - let mut off = 0; - - if self.send_buf.is_empty() { - off += raw_send(&self.socket, buf)?; + fn flush(&mut self) -> std::io::Result<()> { + use std::io::ErrorKind as K; + loop { + match self + .write_buffer + .write_to_stdio(&self.socket) + .io_err_kind_hint() + { + Ok(_) => break Ok(()), + Err((_, K::WouldBlock)) => break Ok(()), + Err((_, K::Interrupted)) => continue, + Err((e, _)) => return Err(e)?, + } } - - self.send_buf.extend(buf[off..].iter()); - - Ok(()) } } - -fn raw_send(mut socket: &mio::net::UnixStream, data: &[u8]) -> anyhow::Result { - let mut off = 0; - - socket.try_io(|| { - loop { - if off == data.len() { - return Ok(()); - } - match socket.write(&data[off..]) { - Ok(n) => { - off += n; - } - Err(e) if e.kind() == ErrorKind::Interrupted => { - // pass – retry - } - Err(e) if off > 0 || e.kind() == ErrorKind::WouldBlock => return Ok(()), - Err(e) => return Err(e), - } - } - })?; - - Ok(off) -} - -fn raw_recv(mut socket: &mio::net::UnixStream, out: &mut [u8]) -> anyhow::Result { - let mut off = 0; - - socket.try_io(|| { - loop { - if off == out.len() { - return Ok(()); - } - match socket.read(&mut out[off..]) { - Ok(n) => { - off += n; - } - Err(e) if e.kind() == ErrorKind::Interrupted => { - // pass – retry - } - Err(e) if off > 0 || e.kind() == ErrorKind::WouldBlock => return Ok(()), - Err(e) => return Err(e), - } - } - })?; - - Ok(off) -} diff --git a/wireguard-broker/src/brokers/netlink.rs b/wireguard-broker/src/brokers/netlink.rs index 556bd3c..2f4fa8f 100644 --- a/wireguard-broker/src/brokers/netlink.rs +++ b/wireguard-broker/src/brokers/netlink.rs @@ -79,6 +79,7 @@ impl WireGuardBroker for NetlinkWireGuardBroker { fn set_psk(&mut self, config: SerializedBrokerConfig) -> Result<(), Self::Error> { let config: NetworkBrokerConfig = config .try_into() + // TODO: I think this is the wrong error .map_err(|_e| SetPskError::NoSuchInterface)?; // Ensure that the peer exists by querying the device configuration // TODO: Use InvalidInterfaceError