mirror of
https://github.com/rosenpass/rosenpass.git
synced 2025-12-05 20:40:02 -08:00
Add docs and tests for the decoder module in length_prefix_encoding
This commit is contained in:
@@ -1,3 +1,16 @@
|
||||
//! This module provides utilities for decoding length-prefixed messages from I/O streams.
|
||||
//!
|
||||
//! Messages are prefixed with an unsigned 64-bit little-endian length header, followed by the
|
||||
//! message payload. The [`LengthPrefixDecoder`] is a central component here, maintaining
|
||||
//! internal buffers and state for partial reads and boundary checks.
|
||||
//!
|
||||
//! The module defines errors to handle size mismatches, I/O issues, and boundary violations
|
||||
//! that may occur during decoding.
|
||||
//!
|
||||
//! The abstractions provided in this module enable safe and convenient reading
|
||||
//! of structured data from streams, including handling unexpected EOFs and ensuring messages
|
||||
//! fit within allocated buffers.
|
||||
|
||||
use std::{borrow::BorrowMut, cmp::min, io};
|
||||
|
||||
use thiserror::Error;
|
||||
@@ -8,54 +21,79 @@ use crate::{
|
||||
result::ensure_or,
|
||||
};
|
||||
|
||||
/// Size in bytes of a message header carrying length information
|
||||
/// Size in bytes of the message header carrying length information.
|
||||
/// Currently, HEADER_SIZE is always 8 bytes and encodes a 64-bit little-endian number.
|
||||
pub const HEADER_SIZE: usize = std::mem::size_of::<u64>();
|
||||
|
||||
/// Error enum representing sanity check failures when accessing buffer regions.
|
||||
///
|
||||
/// This error is triggered when internal offsets point outside allowable regions.
|
||||
#[derive(Error, Debug)]
|
||||
/// Error enum to represent various boundary sanity check failures during buffer operations
|
||||
pub enum SanityError {
|
||||
/// The given offset exceeded the read buffer bounds.
|
||||
#[error("Offset is out of read buffer bounds")]
|
||||
/// Error indicating that the given offset exceeds the bounds of the read buffer
|
||||
OutOfBufferBounds,
|
||||
|
||||
/// The given offset exceeded the message buffer bounds.
|
||||
#[error("Offset is out of message buffer bounds")]
|
||||
/// Error indicating that the given offset exceeds the bounds of the message buffer
|
||||
OutOfMessageBounds,
|
||||
}
|
||||
|
||||
/// Error indicating that the message size is larger than the available buffer space.
|
||||
#[derive(Error, Debug)]
|
||||
#[error("Message too large ({msg_size} bytes) for buffer ({buf_size} bytes)")]
|
||||
/// Error indicating that message exceeds available buffer space
|
||||
pub struct MessageTooLargeError {
|
||||
msg_size: usize,
|
||||
buf_size: usize,
|
||||
}
|
||||
|
||||
impl MessageTooLargeError {
|
||||
/// Creates a new MessageTooLargeError with the given message and buffer sizes
|
||||
/// Creates a new `MessageTooLargeError` with the given message and buffer sizes.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::MessageTooLargeError;
|
||||
/// let err = MessageTooLargeError::new(1024, 512);
|
||||
/// assert_eq!(format!("{}", err), "Message too large (1024 bytes) for buffer (512 bytes)");
|
||||
/// ```
|
||||
pub fn new(msg_size: usize, buf_size: usize) -> Self {
|
||||
Self { msg_size, buf_size }
|
||||
}
|
||||
|
||||
/// Ensures that the message size fits within the buffer size
|
||||
/// Ensures the message fits within the given buffer.
|
||||
///
|
||||
/// Returns Ok(()) if the message fits, otherwise returns an error with size details
|
||||
/// Returns `Ok(())` if `msg_size <= buf_size`, otherwise returns a `MessageTooLargeError`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::MessageTooLargeError;
|
||||
/// let result = MessageTooLargeError::ensure(100, 200);
|
||||
/// assert!(result.is_ok());
|
||||
///
|
||||
/// let err = MessageTooLargeError::ensure(300, 200).unwrap_err();
|
||||
/// assert_eq!(format!("{}", err), "Message too large (300 bytes) for buffer (200 bytes)");
|
||||
/// ```
|
||||
pub fn ensure(msg_size: usize, buf_size: usize) -> Result<(), Self> {
|
||||
let err = MessageTooLargeError { msg_size, buf_size };
|
||||
ensure_or(msg_size <= buf_size, err)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return type for `ReadFromIo` operations, containing the number of bytes read and an optional message slice.
|
||||
#[derive(Debug)]
|
||||
/// Return type for ReadFromIo operations that contains the number of bytes read and an optional message slice
|
||||
pub struct ReadFromIoReturn<'a> {
|
||||
/// Number of bytes read from the input
|
||||
/// Number of bytes read.
|
||||
pub bytes_read: usize,
|
||||
/// Optional slice containing the complete message, if one was read
|
||||
/// The complete message slice if fully read, otherwise `None`.
|
||||
pub message: Option<&'a mut [u8]>,
|
||||
}
|
||||
|
||||
impl<'a> ReadFromIoReturn<'a> {
|
||||
/// Creates a new ReadFromIoReturn with the given number of bytes read and optional message slice.
|
||||
/// Creates a new `ReadFromIoReturn`.
|
||||
///
|
||||
/// Generally used internally to represent partial or complete read results.
|
||||
pub fn new(bytes_read: usize, message: Option<&'a mut [u8]>) -> Self {
|
||||
Self {
|
||||
bytes_read,
|
||||
@@ -64,13 +102,17 @@ impl<'a> ReadFromIoReturn<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// An error that may occur when reading from an I/O source.
|
||||
///
|
||||
/// This enum wraps I/O errors and message-size errors, allowing higher-level logic to determine
|
||||
/// if the error is a fundamental I/O problem or a size mismatch issue.
|
||||
#[derive(Debug, Error)]
|
||||
/// An enum representing errors that can occur during read operations from I/O
|
||||
pub enum ReadFromIoError {
|
||||
/// Error occurred while reading from the underlying I/O stream
|
||||
/// Error reading from the underlying I/O stream.
|
||||
#[error("Error reading from the underlying stream")]
|
||||
IoError(#[from] io::Error),
|
||||
/// Error occurred because message size exceeded buffer capacity
|
||||
|
||||
/// The message size exceeded the capacity of the available buffer.
|
||||
#[error("Message size out of buffer bounds")]
|
||||
MessageTooLargeError(#[from] MessageTooLargeError),
|
||||
}
|
||||
@@ -84,11 +126,31 @@ impl TryIoErrorKind for ReadFromIoError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
/// A decoder for length-prefixed messages
|
||||
/// A decoder for length-prefixed messages.
|
||||
///
|
||||
/// This struct provides functionality to decode messages that are prefixed with their length.
|
||||
/// It maintains internal state for header information, the message buffer, and current offset.
|
||||
/// This decoder reads a 64-bit little-endian length prefix followed by the message payload.
|
||||
/// It maintains state so that partial reads from a non-blocking or streaming source can
|
||||
/// accumulate until a full message is available.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use std::io::Cursor;
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::LengthPrefixDecoder;
|
||||
/// let data: Vec<u8> = {
|
||||
/// let mut buf = Vec::new();
|
||||
/// buf.extend_from_slice(&(5u64.to_le_bytes())); // message length = 5
|
||||
/// buf.extend_from_slice(b"hello");
|
||||
/// buf
|
||||
/// };
|
||||
///
|
||||
/// let mut decoder = LengthPrefixDecoder::new(vec![0; 64]);
|
||||
/// let mut cursor = Cursor::new(data);
|
||||
///
|
||||
/// let message = decoder.read_all_from_stdio(&mut cursor).expect("read failed");
|
||||
/// assert_eq!(message, b"hello");
|
||||
/// ```
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct LengthPrefixDecoder<Buf: BorrowMut<[u8]>> {
|
||||
header: [u8; HEADER_SIZE],
|
||||
buf: Buf,
|
||||
@@ -96,33 +158,102 @@ pub struct LengthPrefixDecoder<Buf: BorrowMut<[u8]>> {
|
||||
}
|
||||
|
||||
impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
/// Creates a new LengthPrefixDecoder with the given buffer
|
||||
/// Creates a new `LengthPrefixDecoder` with the provided buffer.
|
||||
///
|
||||
/// The provided buffer must be large enough to hold the expected maximum message size.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::LengthPrefixDecoder;
|
||||
/// let decoder = LengthPrefixDecoder::new(vec![0; 1024]);
|
||||
/// assert_eq!(*decoder.bytes_read(), 0);
|
||||
/// ```
|
||||
pub fn new(buf: Buf) -> Self {
|
||||
let header = Default::default();
|
||||
let off = 0;
|
||||
Self { header, buf, off }
|
||||
}
|
||||
|
||||
/// Clears and zeroes all internal state
|
||||
/// Clears and zeroes all internal state.
|
||||
///
|
||||
/// This zeroizes the header and the buffer, as well as resets the offset to zero.
|
||||
pub fn clear(&mut self) {
|
||||
self.zeroize()
|
||||
}
|
||||
|
||||
/// Creates a new LengthPrefixDecoder from its component parts
|
||||
/// Creates a decoder from parts.
|
||||
///
|
||||
/// Typically used for low-level reconstruction of a decoder state.
|
||||
pub fn from_parts(header: [u8; HEADER_SIZE], buf: Buf, off: usize) -> Self {
|
||||
Self { header, buf, off }
|
||||
}
|
||||
|
||||
/// Consumes the decoder and returns its component parts
|
||||
/// Consumes the decoder and returns its internal parts.
|
||||
///
|
||||
/// Returns the header, the underlying buffer, and the current offset.
|
||||
pub fn into_parts(self) -> ([u8; HEADER_SIZE], Buf, usize) {
|
||||
let Self { header, buf, off } = self;
|
||||
(header, buf, off)
|
||||
}
|
||||
|
||||
/// Reads a complete message from the given reader into the decoder.
|
||||
/// Reads a complete message from the given reader.
|
||||
///
|
||||
/// Retries on interrupts and returns the decoded message buffer on success.
|
||||
/// Returns an error if the read fails or encounters an unexpected EOF.
|
||||
/// Will retry on interrupts and fails if EOF is encountered prematurely. On success,
|
||||
/// returns a mutable slice of the fully read message.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ## Successful read
|
||||
/// ```
|
||||
/// # use std::io::Cursor;
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::{LengthPrefixDecoder, ReadFromIoError, MessageTooLargeError};
|
||||
/// let mut data: Cursor<Vec<u8>> = {
|
||||
/// let mut buf = Vec::new();
|
||||
/// buf.extend_from_slice(&(3u64.to_le_bytes()));
|
||||
/// // The buffer can also be larger than the message size:
|
||||
/// // Here `cats` is 4 bytes and 1 byte longer than the message size defined in the header
|
||||
/// buf.extend_from_slice(b"cats");
|
||||
/// Cursor::new(buf)
|
||||
/// };
|
||||
/// let mut decoder = LengthPrefixDecoder::new(vec![0; 8]);
|
||||
/// let msg = decoder.read_all_from_stdio(&mut data).expect("read failed");
|
||||
/// assert_eq!(msg, b"cat");
|
||||
/// ```
|
||||
///
|
||||
/// ## MessageTooLargeError
|
||||
///
|
||||
/// Buffer of the `LengthPrefixDecoder` configured to be too small:
|
||||
/// ```
|
||||
/// # use std::io::Cursor;
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::{LengthPrefixDecoder, ReadFromIoError, MessageTooLargeError};
|
||||
/// let mut data: Cursor<Vec<u8>> = {
|
||||
/// let mut buf = Vec::new();
|
||||
/// buf.extend_from_slice(&(7u64.to_le_bytes()));
|
||||
/// buf.extend_from_slice(b"giraffe");
|
||||
/// Cursor::new(buf)
|
||||
/// };
|
||||
/// // Buffer is too small, should be at least 7 bytes (defined in the header)
|
||||
/// let mut decoder = LengthPrefixDecoder::new(vec![0; 5]);
|
||||
/// let err = decoder.read_all_from_stdio(&mut data).expect_err("read should have failed");
|
||||
/// assert!(matches!(err, ReadFromIoError::MessageTooLargeError(_)));
|
||||
/// ```
|
||||
///
|
||||
/// ## IOError (EOF)
|
||||
/// ```
|
||||
/// # use std::io::Cursor;
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::{LengthPrefixDecoder, ReadFromIoError, MessageTooLargeError};
|
||||
/// let mut data: Cursor<Vec<u8>> = {
|
||||
/// let mut buf = Vec::new();
|
||||
/// // Message size set to 10 bytes, but the message is only 7 bytes long
|
||||
/// buf.extend_from_slice(&(10u64.to_le_bytes()));
|
||||
/// buf.extend_from_slice(b"giraffe");
|
||||
/// Cursor::new(buf)
|
||||
/// };
|
||||
/// let mut decoder = LengthPrefixDecoder::new(vec![0; 10]);
|
||||
/// let err = decoder.read_all_from_stdio(&mut data).expect_err("read should have failed");
|
||||
/// assert!(matches!(err, ReadFromIoError::IoError(_)));
|
||||
/// ```
|
||||
pub fn read_all_from_stdio<R: io::Read>(
|
||||
&mut self,
|
||||
mut r: R,
|
||||
@@ -153,7 +284,19 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads from the given reader into the decoder's internal buffers
|
||||
/// Attempts to read from the given `Read` source into the decoder.
|
||||
///
|
||||
/// On success, returns how many bytes were read and a mutable slice of the complete message if fully available.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use std::io::Cursor;
|
||||
/// # use rosenpass_util::length_prefix_encoding::decoder::{LengthPrefixDecoder, ReadFromIoReturn};
|
||||
/// let mut data = Cursor::new([4u64.to_le_bytes().as_slice(), b"cats"].concat());
|
||||
/// let mut decoder = LengthPrefixDecoder::new(vec![0; 8]);
|
||||
/// decoder.read_from_stdio(&mut data).expect("read failed");
|
||||
/// ```
|
||||
pub fn read_from_stdio<R: io::Read>(
|
||||
&mut self,
|
||||
mut r: R,
|
||||
@@ -179,7 +322,12 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets the next buffer slice that can be written to
|
||||
/// Returns the next slice of internal buffer that needs data.
|
||||
///
|
||||
/// If the header is not yet fully read, returns the remaining part of the header buffer.
|
||||
/// Otherwise, returns the remaining part of the message buffer if the message size is known.
|
||||
///
|
||||
/// If no more data is needed (message fully read), returns `Ok(None)`.
|
||||
pub fn next_slice_to_write_to(&mut self) -> Result<Option<&mut [u8]>, MessageTooLargeError> {
|
||||
fn some_if_nonempty(buf: &mut [u8]) -> Option<&mut [u8]> {
|
||||
match buf.is_empty() {
|
||||
@@ -202,7 +350,9 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Advances the internal offset by the specified number of bytes
|
||||
/// Advances the internal offset by `count` bytes.
|
||||
///
|
||||
/// This checks that the offset does not exceed buffer or message limits.
|
||||
pub fn advance(&mut self, count: usize) -> Result<(), SanityError> {
|
||||
let off = self.off + count;
|
||||
let msg_off = off.saturating_sub(HEADER_SIZE);
|
||||
@@ -220,7 +370,9 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensures that the internal message buffer is large enough for the message size in the header
|
||||
/// Checks that the allocated message buffer is large enough for the message length.
|
||||
///
|
||||
/// If the header is not fully read, this does nothing. If it is, ensures the buffer fits the message.
|
||||
pub fn ensure_sufficient_msg_buffer(&self) -> Result<(), MessageTooLargeError> {
|
||||
let buf_size = self.message_buffer().len();
|
||||
let msg_size = match self.get_header() {
|
||||
@@ -230,53 +382,53 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
MessageTooLargeError::ensure(msg_size, buf_size)
|
||||
}
|
||||
|
||||
/// Returns a reference to the header buffer
|
||||
/// Returns a reference to the header buffer.
|
||||
pub fn header_buffer(&self) -> &[u8] {
|
||||
&self.header[..]
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the header buffer
|
||||
/// Returns a mutable reference to the header buffer.
|
||||
pub fn header_buffer_mut(&mut self) -> &mut [u8] {
|
||||
&mut self.header[..]
|
||||
}
|
||||
|
||||
/// Returns a reference to the message buffer
|
||||
/// Returns a reference to the underlying message buffer.
|
||||
pub fn message_buffer(&self) -> &[u8] {
|
||||
self.buf.borrow()
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the message buffer
|
||||
/// Returns a mutable reference to the underlying message buffer.
|
||||
pub fn message_buffer_mut(&mut self) -> &mut [u8] {
|
||||
self.buf.borrow_mut()
|
||||
}
|
||||
|
||||
/// Returns the number of bytes read so far
|
||||
/// Returns a reference to the total number of bytes read so far.
|
||||
pub fn bytes_read(&self) -> &usize {
|
||||
&self.off
|
||||
}
|
||||
|
||||
/// Consumes the decoder and returns just the message buffer
|
||||
/// Consumes the decoder and returns the underlying buffer.
|
||||
pub fn into_message_buffer(self) -> Buf {
|
||||
let Self { buf, .. } = self;
|
||||
buf
|
||||
}
|
||||
|
||||
/// Returns the current offset into the header buffer
|
||||
/// Returns the current offset into the header buffer.
|
||||
pub fn header_buffer_offset(&self) -> usize {
|
||||
min(self.off, HEADER_SIZE)
|
||||
}
|
||||
|
||||
/// Returns the current offset into the message buffer
|
||||
/// Returns the current offset into the message buffer.
|
||||
pub fn message_buffer_offset(&self) -> usize {
|
||||
self.off.saturating_sub(HEADER_SIZE)
|
||||
}
|
||||
|
||||
/// Returns whether a complete header has been read
|
||||
/// Returns whether the header has been fully read.
|
||||
pub fn has_header(&self) -> bool {
|
||||
self.header_buffer_offset() == HEADER_SIZE
|
||||
}
|
||||
|
||||
/// Returns whether a complete message has been read
|
||||
/// Returns `true` if the entire message has been read, `false` otherwise.
|
||||
pub fn has_message(&self) -> Result<bool, MessageTooLargeError> {
|
||||
self.ensure_sufficient_msg_buffer()?;
|
||||
let msg_size = match self.get_header() {
|
||||
@@ -286,55 +438,55 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
Ok(self.message_buffer_avail().len() == msg_size)
|
||||
}
|
||||
|
||||
/// Returns a slice of the available data in the header buffer
|
||||
/// Returns the currently read portion of the header.
|
||||
pub fn header_buffer_avail(&self) -> &[u8] {
|
||||
let off = self.header_buffer_offset();
|
||||
&self.header_buffer()[..off]
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the available data in the header buffer
|
||||
/// Returns a mutable slice of the currently read portion of the header.
|
||||
pub fn header_buffer_avail_mut(&mut self) -> &mut [u8] {
|
||||
let off = self.header_buffer_offset();
|
||||
&mut self.header_buffer_mut()[..off]
|
||||
}
|
||||
|
||||
/// Returns a slice of the remaining space in the header buffer
|
||||
/// Returns the remaining unread portion of the header.
|
||||
pub fn header_buffer_left(&self) -> &[u8] {
|
||||
let off = self.header_buffer_offset();
|
||||
&self.header_buffer()[off..]
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the remaining space in the header buffer
|
||||
/// Returns a mutable slice of the remaining unread portion of the header.
|
||||
pub fn header_buffer_left_mut(&mut self) -> &mut [u8] {
|
||||
let off = self.header_buffer_offset();
|
||||
&mut self.header_buffer_mut()[off..]
|
||||
}
|
||||
|
||||
/// Returns a slice of the available data in the message buffer
|
||||
/// Returns the currently read portion of the message.
|
||||
pub fn message_buffer_avail(&self) -> &[u8] {
|
||||
let off = self.message_buffer_offset();
|
||||
&self.message_buffer()[..off]
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the available data in the message buffer
|
||||
/// Returns a mutable slice of the currently read portion of the message.
|
||||
pub fn message_buffer_avail_mut(&mut self) -> &mut [u8] {
|
||||
let off = self.message_buffer_offset();
|
||||
&mut self.message_buffer_mut()[..off]
|
||||
}
|
||||
|
||||
/// Returns a slice of the remaining space in the message buffer
|
||||
/// Returns the remaining unread portion of the message buffer.
|
||||
pub fn message_buffer_left(&self) -> &[u8] {
|
||||
let off = self.message_buffer_offset();
|
||||
&self.message_buffer()[off..]
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the remaining space in the message buffer
|
||||
/// Returns a mutable slice of the remaining unread portion of the message buffer.
|
||||
pub fn message_buffer_left_mut(&mut self) -> &mut [u8] {
|
||||
let off = self.message_buffer_offset();
|
||||
&mut self.message_buffer_mut()[off..]
|
||||
}
|
||||
|
||||
/// Returns the message size from the header if available
|
||||
/// Returns the message size from the header if fully read.
|
||||
pub fn get_header(&self) -> Option<usize> {
|
||||
match self.header_buffer_offset() == HEADER_SIZE {
|
||||
false => None,
|
||||
@@ -342,23 +494,23 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the size of the message if header is available
|
||||
/// Returns the message size if known (i.e., if the header is fully read).
|
||||
pub fn message_size(&self) -> Option<usize> {
|
||||
self.get_header()
|
||||
}
|
||||
|
||||
/// Returns the total size of the encoded message including header
|
||||
/// Returns the total size of the encoded message (header + payload) if known.
|
||||
pub fn encoded_message_bytes(&self) -> Option<usize> {
|
||||
self.message_size().map(|sz| sz + HEADER_SIZE)
|
||||
}
|
||||
|
||||
/// Returns a slice of the message fragment if available
|
||||
/// Returns the complete message fragment if the header is known and buffer is sufficient.
|
||||
pub fn message_fragment(&self) -> Result<Option<&[u8]>, MessageTooLargeError> {
|
||||
self.ensure_sufficient_msg_buffer()?;
|
||||
Ok(self.message_size().map(|sz| &self.message_buffer()[..sz]))
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the message fragment if available
|
||||
/// Returns a mutable reference to the complete message fragment.
|
||||
pub fn message_fragment_mut(&mut self) -> Result<Option<&mut [u8]>, MessageTooLargeError> {
|
||||
self.ensure_sufficient_msg_buffer()?;
|
||||
Ok(self
|
||||
@@ -366,14 +518,14 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
.map(|sz| &mut self.message_buffer_mut()[..sz]))
|
||||
}
|
||||
|
||||
/// Returns a slice of the available data in the message fragment
|
||||
/// Returns the portion of the message fragment that has been filled so far.
|
||||
pub fn message_fragment_avail(&self) -> Result<Option<&[u8]>, MessageTooLargeError> {
|
||||
let off = self.message_buffer_avail().len();
|
||||
self.message_fragment()
|
||||
.map(|frag| frag.map(|frag| &frag[..off]))
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the available data in the message fragment
|
||||
/// Returns a mutable portion of the message fragment that has been filled so far.
|
||||
pub fn message_fragment_avail_mut(
|
||||
&mut self,
|
||||
) -> Result<Option<&mut [u8]>, MessageTooLargeError> {
|
||||
@@ -382,28 +534,32 @@ impl<Buf: BorrowMut<[u8]>> LengthPrefixDecoder<Buf> {
|
||||
.map(|frag| frag.map(|frag| &mut frag[..off]))
|
||||
}
|
||||
|
||||
/// Returns a slice of the remaining space in the message fragment
|
||||
/// Returns the remaining portion of the message fragment that still needs to be read.
|
||||
pub fn message_fragment_left(&self) -> Result<Option<&[u8]>, MessageTooLargeError> {
|
||||
let off = self.message_buffer_avail().len();
|
||||
self.message_fragment()
|
||||
.map(|frag| frag.map(|frag| &frag[off..]))
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the remaining space in the message fragment
|
||||
/// Returns a mutable slice of the remaining portion of the message fragment that still needs to be read.
|
||||
pub fn message_fragment_left_mut(&mut self) -> Result<Option<&mut [u8]>, MessageTooLargeError> {
|
||||
let off = self.message_buffer_avail().len();
|
||||
self.message_fragment_mut()
|
||||
.map(|frag| frag.map(|frag| &mut frag[off..]))
|
||||
}
|
||||
|
||||
/// Returns a slice of the complete message if available
|
||||
/// If the entire message is available, returns a reference to it.
|
||||
///
|
||||
/// Otherwise returns `Ok(None)`.
|
||||
pub fn message(&self) -> Result<Option<&[u8]>, MessageTooLargeError> {
|
||||
let sz = self.message_size();
|
||||
self.message_fragment_avail()
|
||||
.map(|frag_opt| frag_opt.and_then(|frag| (frag.len() == sz?).then_some(frag)))
|
||||
}
|
||||
|
||||
/// Returns a mutable slice of the complete message if available
|
||||
/// If the entire message is available, returns a mutable reference to it.
|
||||
///
|
||||
/// Otherwise returns `Ok(None)`.
|
||||
pub fn message_mut(&mut self) -> Result<Option<&mut [u8]>, MessageTooLargeError> {
|
||||
let sz = self.message_size();
|
||||
self.message_fragment_avail_mut()
|
||||
@@ -418,3 +574,88 @@ impl<Buf: BorrowMut<[u8]>> Zeroize for LengthPrefixDecoder<Buf> {
|
||||
self.off.zeroize();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_read_from_stdio() {
|
||||
use std::io::Cursor;
|
||||
let mut data = {
|
||||
let mut buf = Vec::new();
|
||||
buf.extend_from_slice(&(8u64.to_le_bytes()));
|
||||
buf.extend_from_slice(b"cats"); // provide only half of the message
|
||||
Cursor::new(buf)
|
||||
};
|
||||
|
||||
let mut decoder = LengthPrefixDecoder::new(vec![0; 9]);
|
||||
|
||||
fn loop_read(decoder: &mut LengthPrefixDecoder<Vec<u8>>, data: &mut Cursor<Vec<u8>>) {
|
||||
// Read until the buffer is fully read
|
||||
let data_len = data.get_ref().len();
|
||||
loop {
|
||||
let result: ReadFromIoReturn =
|
||||
decoder.read_from_stdio(&mut *data).expect("read failed");
|
||||
if data.position() as usize == data_len {
|
||||
// the entire data was read
|
||||
break;
|
||||
}
|
||||
assert!(result.message.is_none());
|
||||
assert!(result.bytes_read > 0); // at least 1 byte was read (or all data was read)
|
||||
}
|
||||
}
|
||||
|
||||
loop_read(&mut decoder, &mut data);
|
||||
|
||||
// INSERT HERE A TEST FOR EACH INTERNAL METHOD OF LengthPrefixDecoder (decoder)
|
||||
assert_eq!(decoder.message_size(), Some(8));
|
||||
|
||||
// Header-related assertions
|
||||
assert!(decoder.has_header());
|
||||
assert_eq!(decoder.header_buffer_offset(), HEADER_SIZE);
|
||||
assert_eq!(decoder.header_buffer_avail().len(), HEADER_SIZE);
|
||||
assert_eq!(decoder.header_buffer_left().len(), 0);
|
||||
assert_eq!(decoder.get_header(), Some(8));
|
||||
assert_eq!(decoder.message_size(), Some(8));
|
||||
assert_eq!(decoder.encoded_message_bytes(), Some(8 + HEADER_SIZE));
|
||||
|
||||
// Message-related assertions
|
||||
assert_eq!(*decoder.bytes_read(), 12);
|
||||
assert_eq!(decoder.message_buffer_offset(), 4); // "cats" is 4 bytes
|
||||
assert_eq!(decoder.message_buffer_avail(), b"cats");
|
||||
assert_eq!(decoder.message_buffer_left().len(), 5); // buffer size is 9, 4 read -> 5 left
|
||||
assert!(!decoder.has_message().unwrap()); // not fully read
|
||||
|
||||
// Message fragment assertions
|
||||
let frag = decoder.message_fragment().unwrap().unwrap();
|
||||
assert_eq!(frag.len(), 8); // full message fragment slice (not fully filled)
|
||||
let frag_avail = decoder.message_fragment_avail().unwrap().unwrap();
|
||||
assert_eq!(frag_avail, b"cats"); // available portion matches what's read
|
||||
let frag_left = decoder.message_fragment_left().unwrap().unwrap();
|
||||
assert_eq!(frag_left.len(), 4); // 4 bytes remain to complete the message
|
||||
assert_eq!(decoder.message().unwrap(), None); // full message not yet available
|
||||
|
||||
let mut data = Cursor::new(Vec::from(b"dogs"));
|
||||
loop_read(&mut decoder, &mut data);
|
||||
|
||||
// After providing the remaining "dogs" data, the message should now be fully available.
|
||||
assert!(decoder.has_message().unwrap());
|
||||
assert_eq!(decoder.message().unwrap().unwrap(), b"catsdogs");
|
||||
|
||||
// At this point:
|
||||
// - The entire message (8 bytes) plus the header (8 bytes for the length) should be accounted for.
|
||||
assert_eq!(
|
||||
decoder.message_fragment_avail().unwrap().unwrap(),
|
||||
b"catsdogs"
|
||||
);
|
||||
assert!(decoder.message_fragment_left().unwrap().unwrap().is_empty());
|
||||
|
||||
// The offsets and buffers should reflect that everything is read.
|
||||
assert_eq!(decoder.message_buffer_offset(), 8); // all 8 message bytes are now read
|
||||
assert_eq!(decoder.message_buffer_avail(), b"catsdogs");
|
||||
assert_eq!(decoder.message_buffer_left().len(), 1); // buffer size was 9, 8 read -> 1 left unused
|
||||
|
||||
// No more data needed to complete the message.
|
||||
assert!(decoder.next_slice_to_write_to().unwrap().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user