From 31a5dbe4201696228df54a87fd41a516861aeb6e Mon Sep 17 00:00:00 2001 From: Karolin Varner Date: Fri, 1 Aug 2025 12:01:05 +0200 Subject: [PATCH] feat: Janitor, utilities for cleaning up with tokio --- Cargo.lock | 2 + util/Cargo.toml | 8 + util/src/lib.rs | 2 + util/src/tokio/janitor.rs | 618 ++++++++++++++++++++++++++++++++++++ util/src/tokio/local_key.rs | 13 + util/src/tokio/mod.rs | 4 + util/tests/janitor.rs | 85 +++++ 7 files changed, 732 insertions(+) create mode 100644 util/src/tokio/janitor.rs create mode 100644 util/src/tokio/local_key.rs create mode 100644 util/src/tokio/mod.rs create mode 100644 util/tests/janitor.rs diff --git a/Cargo.lock b/Cargo.lock index 40dda15..8e15916 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2184,11 +2184,13 @@ dependencies = [ "anyhow", "base64ct", "libcrux-test-utils", + "log", "mio", "rustix", "static_assertions", "tempfile", "thiserror 1.0.69", + "tokio", "typenum", "uds", "zerocopy 0.7.35", diff --git a/util/Cargo.toml b/util/Cargo.toml index bdd1ddf..df09eb2 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -25,7 +25,15 @@ mio = { workspace = true } tempfile = { workspace = true } uds = { workspace = true, optional = true, features = ["mio_1xx"] } libcrux-test-utils = { workspace = true, optional = true } +tokio = { workspace = true, optional = true, features = [ + "macros", + "rt-multi-thread", + "sync", + "time", +] } +log = { workspace = true } [features] experiment_file_descriptor_passing = ["uds"] trace_bench = ["dep:libcrux-test-utils"] +tokio = ["dep:tokio"] diff --git a/util/src/lib.rs b/util/src/lib.rs index 7949a3b..af70d47 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -30,6 +30,8 @@ pub mod option; pub mod result; /// Time and duration utilities. pub mod time; +#[cfg(feature = "tokio")] +pub mod tokio; /// Trace benchmarking utilities #[cfg(feature = "trace_bench")] pub mod trace_bench; diff --git a/util/src/tokio/janitor.rs b/util/src/tokio/janitor.rs new file mode 100644 index 0000000..91624e7 --- /dev/null +++ b/util/src/tokio/janitor.rs @@ -0,0 +1,618 @@ +//! Facilities to spawn tasks that will be reliably executed +//! before the current tokio context finishes. +//! +//! Asynchronous applications often need to manage multiple parallel tasks. +//! Tokio supports spawning these tasks with [tokio::task::spawn], but when the +//! tokio event loop exits, all lingering background tasks will aborted. +//! +//! Tokio supports managing multiple parallel tasks, all of which should exit successfully, through +//! [tokio::task::JoinSet]. This is a useful and very explicit API. To launch a background job, +//! user code needs to be aware of which JoinSet to use, so this can lead to a JoinSet needing to +//! be handed around in many parts of the application. +//! +//! This level of explicitness avoids bugs, but it can be cumbersome to use and it can introduce a +//! [function coloring](https://morestina.net/1686/rust-async-is-colored) issue; +//! creating a strong distinction between functions which have access +//! to a JoinSet (one color) and those that have not (the other color). Functions with the color +//! that has access to a JoinSet can call those functions that do not need access, but not the +//! other way around. This can make refactoring quite difficult: your refactor needs to use a +//! function that requires a JoinSet? Then have fun spending quite a bit of time recoloring +//! possibly many parts of your code base. +//! +//! This module solves this issue by essentially registering a central [JoinSet] through ambient +//! (semi-global), task-local variables. The mechanism to register this task-local JoinSet is +//! [tokio::task_local]. +//! +//! # Error-handling +//! +//! The janitor accepts daemons/cleanup jobs which return an [anyhow::Error]. +//! When any daemon returns an error, then the entire janitor will immediately exit with a failure +//! without awaiting the other registered tasks. +//! +//! The janitor can generally produce errors in three scenarios: +//! +//! - A daemon panics +//! - A daemon returns an error +//! - An internal error +//! +//! When [enter_janitor]/[ensure_janitor] is used to set up a janitor, these functions will always +//! panic in case of a janitor error. **This also means, that these functions panic if any daemon +//! returns an error**. +//! +//! You can explicitly handle janitor errors through [try_enter_janitor]/[try_ensure_janitor]. +//! +//! # Examples +//! +#![doc = "```ignore"] +#![doc = include_str!("../../tests/janitor.rs")] +#![doc = "```"] + +use std::any::type_name; +use std::future::Future; + +use anyhow::{bail, Context}; + +use tokio::task::{AbortHandle, JoinError, JoinHandle, JoinSet}; +use tokio::task_local; + +use tokio::sync::mpsc::unbounded_channel as janitor_channel; + +use crate::tokio::local_key::LocalKeyExt; + +/// Type for the message queue from [JanitorClient]/[JanitorSupervisor] to [JanitorAgent]: Receiving side +type JanitorQueueRx = tokio::sync::mpsc::UnboundedReceiver; +/// Type for the message queue from [JanitorClient]/[JanitorSupervisor] to [JanitorAgent]: Sending side +type JanitorQueueTx = tokio::sync::mpsc::UnboundedSender; +/// Type for the message queue from [JanitorClient]/[JanitorSupervisor] to [JanitorAgent]: Sending side, Weak reference +type WeakJanitorQueueTx = tokio::sync::mpsc::WeakUnboundedSender; + +/// Type of the return value for jobs submitted to [spawn_daemon]/[spawn_cleanup_job] +type CleanupJobResult = anyhow::Result<()>; +/// Handle by which we internally refer to cleanup jobs submitted by [spawn_daemon]/[spawn_cleanup_job] +/// to the current [JanitorAgent] +type CleanupJob = JoinHandle; + +task_local! { + /// Handle to the current [JanitorAgent]; this is where [ensure_janitor]/[enter_janitor] + /// register the newly created janitor + static CURRENT_JANITOR: JanitorClient; +} + +/// Messages supported by [JanitorAgent] +#[derive(Debug)] +enum JanitorTicket { + /// This message transmits a new cleanup job to the [JanitorAgent] + CleanupJob(CleanupJob), +} + +/// Represents the background task which actually manages cleanup jobs. +/// +/// This is what is started by [enter_janitor]/[ensure_janitor] +/// and what receives the messages sent by [JanitorSupervisor]/[JanitorClient] +#[derive(Debug)] +struct JanitorAgent { + /// Background tasks currently registered with this agent. + /// + /// This contains two types of tasks: + /// + /// 1. Background jobs launched through [enter_janitor]/[ensure_janitor] + /// 2. A single task waiting for new [JanitorTicket]s being transmitted from a [JanitorSupervisor]/[JanitorClient] + tasks: JoinSet, + /// Whether this [JanitorAgent] will ever receive new [JanitorTicket]s + /// + /// Communication between [JanitorAgent] and [JanitorSupervisor]/[JanitorClient] uses a message + /// queue (see [JanitorQueueTx]/[JanitorQueueRx]/[WeakJanitorQueueTx]), but you may notice that + /// the Agent does not actually contain a field storing the message queue. + /// Instead, to appease the borrow checker, the message queue is moved into the internal + /// background task (see [Self::tasks]) that waits for new [JanitorTicket]s. + /// + /// Since our state machine still needs to know, whether that queue is closed, we maintain this + /// flag. + /// + /// See [AgentInternalEvent::TicketQueueClosed]. + ticket_queue_closed: bool, +} + +/// These are the return values (events) returned by [JanitorAgent] internal tasks (see +/// [JanitorAgent::tasks]). +#[derive(Debug)] +enum AgentInternalEvent { + /// Notifies the [JanitorAgent] state machine that a cleanup job finished successfully + /// + /// Sent by genuine background tasks registered through [enter_janitor]/[ensure_janitor]. + CleanupJobSuccessful, + /// Notifies the [JanitorAgent] state machine that a cleanup job finished with a tokio + /// [JoinError]. + /// + /// Sent by genuine background tasks registered through [enter_janitor]/[ensure_janitor]. + CleanupJobJoinError(JoinError), + /// Notifies the [JanitorAgent] state machine that a cleanup job returned an error. + /// + /// Sent by genuine background tasks registered through [enter_janitor]/[ensure_janitor]. + CleanupJobReturnedError(anyhow::Error), + /// Notifies the [JanitorAgent] state machine that a new cleanup job was received through the + /// ticket queue. + /// + /// Sent by the background task managing the ticket queue. + ReceivedCleanupJob(JanitorQueueRx, CleanupJob), + /// Notifies the [JanitorAgent] state machine that a new cleanup job was received through the + /// ticket queue. + /// + /// Sent by the background task managing the ticket queue. + /// + /// See [JanitorAgent::ticket_queue_closed]. + TicketQueueClosed, +} + +impl JanitorAgent { + /// Create a new agent. Start with [Self::start]. + fn new() -> Self { + let tasks = JoinSet::new(); + let ticket_queue_closed = false; + Self { + tasks, + ticket_queue_closed, + } + } + + /// Main entry point for the [JanitorAgent]. Launches the background task and returns a [JanitorSupervisor] + /// which can be used to send tickets to the agent and to wait for agent termination. + pub async fn start() -> JanitorSupervisor { + let (queue_tx, queue_rx) = janitor_channel(); + let join_handle = tokio::spawn(async move { Self::new().event_loop(queue_rx).await }); + JanitorSupervisor::new(join_handle, queue_tx) + } + + /// Event loop, processing events from the ticket queue and from [Self::tasks] + async fn event_loop(&mut self, queue_rx: JanitorQueueRx) -> anyhow::Result<()> { + // Seed the internal task list with a single task to receive + self.spawn_internal_recv_ticket_task(queue_rx).await; + + // Process all incoming events until handle_one_event indicates there are + // no more events to process + while self.handle_one_event().await?.is_some() {} + + Ok(()) + } + + /// Process events from [Self::tasks] (and by proxy from the ticket queue) + /// + /// This is the agent's main state machine. + async fn handle_one_event(&mut self) -> anyhow::Result> { + use AgentInternalEvent as E; + match (self.tasks.join_next().await, self.ticket_queue_closed) { + // Normal, successful operation + + // CleanupJob exited successfully, no action neccesary + (Some(Ok(E::CleanupJobSuccessful)), _) => Ok(Some(())), + + // New cleanup job scheduled, add to task list and wait for another task + (Some(Ok(E::ReceivedCleanupJob(queue_rx, job))), _) => { + self.spawn_internal_recv_ticket_task(queue_rx).await; + self.spawn_internal_cleanup_task(job).await; + Ok(Some(())) + } + + // Ticket queue is closed; now we are just waiting for the remaining cleanup jobs + // to terminate + (Some(Ok(E::TicketQueueClosed)), _) => { + self.ticket_queue_closed = true; + Ok(Some(())) + } + + // No more tasks in the task manager and the ticket queue is already closed. + // This just means we are done and can finally terminate the janitor agent + (Option::None, true) => Ok(None), + + // Error handling + + // User callback errors + + // Some cleanup job returned an error as a result + (Some(Ok(E::CleanupJobReturnedError(err))), _) => Err(err).with_context(|| { + format!("Error in cleanup job handled by {}", type_name::()) + }), + + // JoinError produced by the user task: The user task was cancelled. + (Some(Ok(E::CleanupJobJoinError(err))), _) if err.is_cancelled() => Err(err).with_context(|| { + format!( + "Error in cleanup job handled by {me}; the cleanup task was cancelled. + This should not happend and likely indicates a developer error in {me}.", + me = type_name::() + ) + }), + + // JoinError produced by the user task: The user task panicked + (Some(Ok(E::CleanupJobJoinError(err))), _) => Err(err).with_context(|| { + format!( + "Error in cleanup job handled by {}; looks like the cleanup task panicked.", + type_name::() + ) + }), + + // Internal errors: Internal task error + + // JoinError produced by JoinSet::join_next(): The internal task was cancelled + (Some(Err(err)), _) if err.is_cancelled() => Err(err).with_context(|| { + format!( + "Internal error in {me}; internal async task was cancelled. \ + This is probably a developer error in {me}.", + me = type_name::() + ) + }), + + // JoinError produced by JoinSet::join_next(): The internal task panicked + (Some(Err(err)), _) => Err(err).with_context(|| { + format!( + "Internal error in {me}; internal async task panicked. \ + This is probably a developer error in {me}.", + me = type_name::() + ) + }), + + + // Internal errors: State machine failure + + // No tasks left, but ticket queue was not drained + (Option::None, false) => bail!("Internal error in {me}::handle_one_event(); \ + there are no more internal tasks active, but the ticket queue was not drained. \ + The {me}::handle_one_event() code is deliberately designed to never leave the internal task set empty; \ + instead, there should always be one task to receive new cleanup jobs from the task queue unless the task \ + queue has been closed. \ + This is probably a developer error.", + me = type_name::()) + } + } + + /// Used by [Self::event_loop] and [Self::handle_one_event] to start the internal + /// task waiting for tickets on the ticket queue. + async fn spawn_internal_recv_ticket_task( + &mut self, + mut queue_rx: JanitorQueueRx, + ) -> AbortHandle { + self.tasks.spawn(async { + use AgentInternalEvent as E; + use JanitorTicket as T; + + let ticket = queue_rx.recv().await; + match ticket { + Some(T::CleanupJob(job)) => E::ReceivedCleanupJob(queue_rx, job), + Option::None => E::TicketQueueClosed, + } + }) + } + + /// Used by [Self::event_loop] and [Self::handle_one_event] to register + /// background deamons/cleanup jobs submitted via [JanitorTicket] + async fn spawn_internal_cleanup_task(&mut self, job: CleanupJob) -> AbortHandle { + self.tasks.spawn(async { + use AgentInternalEvent as E; + match job.await { + Ok(Ok(())) => E::CleanupJobSuccessful, + Ok(Err(e)) => E::CleanupJobReturnedError(e), + Err(e) => E::CleanupJobJoinError(e), + } + }) + } +} + +/// Client for [JanitorAgent]. Allows for [JanitorTicket]s (background jobs) +/// to be transmitted to the current [JanitorAgent]. +/// +/// This is stored in [CURRENT_JANITOR] as a task.-local variable. +#[derive(Debug)] +struct JanitorClient { + /// Queue we can use to send messages to the current janitor + queue_tx: WeakJanitorQueueTx, +} + +impl JanitorClient { + /// Create a new client. Use through [JanitorSupervisor::get_client] + fn new(queue_tx: WeakJanitorQueueTx) -> Self { + Self { queue_tx } + } + + /// Has the associated [JanitorAgent] shut down? + pub fn is_closed(&self) -> bool { + self.queue_tx + .upgrade() + .map(|channel| channel.is_closed()) + .unwrap_or(false) + } + + /// Spawn a new cleanup job/daemon with the associated [JanitorAgent]. + /// + /// Used internally by [spawn_daemon]/[spawn_cleanup_job]. + pub fn spawn_cleanup_task(&self, future: F) -> Result<(), TrySpawnCleanupJobError> + where + F: Future> + Send + 'static, + { + let background_task = tokio::spawn(future); + self.queue_tx + .upgrade() + .ok_or(TrySpawnCleanupJobError::ActiveJanitorTerminating)? + .send(JanitorTicket::CleanupJob(background_task)) + .map_err(|_| TrySpawnCleanupJobError::ActiveJanitorTerminating) + } +} + +/// Client for [JanitorAgent]. Allows waiting for [JanitorAgent] termination as well as creating +/// [JanitorClient]s, which in turn can be used to submit background daemons/termination jobs +/// to the agent. +#[derive(Debug)] +struct JanitorSupervisor { + /// Represents the tokio task associated with the [JanitorAgent]. + /// + /// We use this to wait for [JanitorAgent] termination in [enter_janitor]/[ensure_janitor] + agent_join_handle: CleanupJob, + /// Queue we can use to send messages to the current janitor + queue_tx: JanitorQueueTx, +} + +impl JanitorSupervisor { + /// Create a new janitor supervisor. Use through [JanitorAgent::start] + pub fn new(agent_join_handle: CleanupJob, queue_tx: JanitorQueueTx) -> Self { + Self { + agent_join_handle, + queue_tx, + } + } + + /// Create a [JanitorClient] for submitting background daemons/cleanup jobs + pub fn get_client(&self) -> JanitorClient { + JanitorClient::new(self.queue_tx.clone().downgrade()) + } + + /// Wait for [JanitorAgent] termination + pub async fn terminate_janitor(self) -> anyhow::Result<()> { + std::mem::drop(self.queue_tx); + self.agent_join_handle.await? + } +} + +/// Return value of [try_enter_janitor]. +#[derive(Debug)] +pub struct EnterJanitorResult { + /// The result produced by the janitor itself. + /// + /// This may contain an error if one of the background daemons/cleanup tasks returned an error, + /// panicked, or in case there is an internal error in the janitor. + pub janitor_result: anyhow::Result<()>, + /// Contains the result of the future passed to [try_enter_janitor]. + pub callee_result: Result, +} + +impl EnterJanitorResult { + /// Create a new result from its components + pub fn new(janitor_result: anyhow::Result<()>, callee_result: Result) -> Self { + Self { + janitor_result, + callee_result, + } + } + + /// Turn this named type into a tuple + pub fn into_tuple(self) -> (anyhow::Result<()>, Result) { + (self.janitor_result, self.callee_result) + } + + /// Panic if [Self::janitor_result] contains an error; returning [Self::callee_result] + /// otherwise. + /// + /// If this panics and both [Self::janitor_result] and [Self::callee_result] contain an error, + /// this will print both errors. + pub fn unwrap_janitor_result(self) -> Result + where + E: std::fmt::Debug, + { + let me: EnsureJanitorResult = self.into(); + me.unwrap_janitor_result() + } + + /// Panic if [Self::janitor_result] or [Self::callee_result] contain an error, + /// returning the Ok value of [Self::callee_result]. + /// + /// If this panics and both [Self::janitor_result] and [Self::callee_result] contain an error, + /// this will print both errors. + pub fn unwrap(self) -> T + where + E: std::fmt::Debug, + { + let me: EnsureJanitorResult = self.into(); + me.unwrap() + } +} + +/// Return value of [try_ensure_janitor]. The only difference compared to [EnterJanitorResult] +/// is that [Self::janitor_result] contains None in case an ambient janitor had already existed. +#[derive(Debug)] +pub struct EnsureJanitorResult { + /// See [EnterJanitorResult::janitor_result] + /// + /// This is: + /// + /// - `None` if a pre-existing ambient janitor was used + /// - `Some(Ok(()))` if a new janitor had to be created and it exited successfully + /// - `Some(Err(...))` if a new janitor had to be created and it exited with an error + pub janitor_result: Option>, + /// See [EnterJanitorResult::callee] + pub callee_result: Result, +} + +impl EnsureJanitorResult { + /// See [EnterJanitorResult::new] + pub fn new(janitor_result: Option>, callee_result: Result) -> Self { + Self { + janitor_result, + callee_result, + } + } + + /// Sets up a [EnsureJanitorResult] with [EnsureJanitorResult::janitor_result] = None. + pub fn from_callee_result(callee_result: Result) -> Self { + Self::new(None, callee_result) + } + + /// Turn this named type into a tuple + pub fn into_tuple(self) -> (Option>, Result) { + (self.janitor_result, self.callee_result) + } + + /// See [EnterJanitorResult::unwrap_janitor_result] + /// + /// If [Self::janitor_result] is None, this won't panic. + pub fn unwrap_janitor_result(self) -> Result + where + E: std::fmt::Debug, + { + match self.into_tuple() { + (Some(Ok(())) | None, res) => res, + (Some(Err(err)), Ok(_)) => panic!( + "Callee in enter_janitor()/ensure_janitor() was successful, \ + but the janitor or some of its deamons failed: {err:?}" + ), + (Some(Err(jerr)), Err(cerr)) => panic!( + "Both the calee and the janitor or \ + some of its deamons falied in enter_janitor()/ensure_janitor():\n\ + \n\ + Janitor/Daemon error: {jerr:?} + \n\ + Callee error: {cerr:?}" + ), + } + } + + /// See [EnterJanitorResult::unwrap] + /// + /// If [Self::janitor_result] is None, this is not considered a failure. + pub fn unwrap(self) -> T + where + E: std::fmt::Debug, + { + match self.unwrap_janitor_result() { + Ok(val) => val, + Err(err) => panic!( + "Janitor or and its deamons in in enter_janitor()/ensure_janitor() was successful, \ + but the callee itself failed: {err:?}" + ), + } + } +} + +impl From> for EnsureJanitorResult { + fn from(val: EnterJanitorResult) -> Self { + EnsureJanitorResult::new(Some(val.janitor_result), val.callee_result) + } +} + +/// Non-panicking version of [enter_janitor]. +pub async fn try_enter_janitor(future: F) -> EnterJanitorResult +where + T: 'static, + F: Future> + 'static, +{ + let janitor_handle = JanitorAgent::start().await; + let callee_result = CURRENT_JANITOR + .scope(janitor_handle.get_client(), future) + .await; + let janitor_result = janitor_handle.terminate_janitor().await; + EnterJanitorResult::new(janitor_result, callee_result) +} + +/// Non-panicking version of [ensure_janitor] +pub async fn try_ensure_janitor(future: F) -> EnsureJanitorResult +where + T: 'static, + F: Future> + 'static, +{ + match CURRENT_JANITOR.is_set() { + true => EnsureJanitorResult::from_callee_result(future.await), + false => try_enter_janitor(future).await.into(), + } +} + +/// Register a janitor that can be used to register background daemons/cleanup jobs **only within +/// the future passed to this**. +/// +/// The function will wait for both the given future and all background jobs registered with the +/// janitor to terminate. +/// +/// For a version that does not panick, see [try_enter_janitor]. +pub async fn enter_janitor(future: F) -> Result +where + T: 'static, + E: std::fmt::Debug, + F: Future> + 'static, +{ + try_enter_janitor(future).await.unwrap_janitor_result() +} + +/// Variant of [enter_janitor] that will first check if a janitor already exists. +/// A new janitor is only set up, if no janitor has been previously registered. +pub async fn ensure_janitor(future: F) -> Result +where + T: 'static, + E: std::fmt::Debug, + F: Future> + 'static, +{ + try_ensure_janitor(future).await.unwrap_janitor_result() +} + +/// Error returned by [try_spawn_cleanup_job] +#[derive(thiserror::Error, Debug)] +pub enum TrySpawnCleanupJobError { + /// No active janitor exists + #[error("No janitor registered. Did the developer forget to call enter_janitor(…) or ensure_janitor(…)?")] + NoActiveJanitor, + /// The currently active janitor is in the process of terminating + #[error("There is a registered janitor, but it is currently in the process of terminating and won't accept new tasks.")] + ActiveJanitorTerminating, +} + +/// Check whether a janitor has been set up with [enter_janitor]/[ensure_janitor] +pub fn has_active_janitor() -> bool { + CURRENT_JANITOR + .try_with(|client| client.is_closed()) + .unwrap_or(false) +} + +/// Non-panicking variant of [spawn_cleanup_job]. +/// +/// This function is available under two names; see [spawn_cleanup_job] for details about this: +/// +/// 1. [try_spawn_cleanup_job] +/// 2. [try_spawn_daemon] +pub fn try_spawn_cleanup_job(future: F) -> Result<(), TrySpawnCleanupJobError> +where + F: Future> + Send + 'static, +{ + CURRENT_JANITOR + .try_with(|client| client.spawn_cleanup_task(future)) + .map_err(|_| TrySpawnCleanupJobError::NoActiveJanitor)??; + Ok(()) +} + +/// Register a cleanup job or a daemon with the current janitor registered through +/// [enter_janitor]/[ensure_janitor]: +/// +/// This function is available under two names: +/// +/// 1. [spawn_cleanup_job] +/// 2. [spawn_daemon] +/// +/// The first name should be used in destructors and to spawn cleanup actions which immediately +/// begin their task. +/// +/// The second name should be used for any other tasks; e.g. when the janitor setup is used to +/// manage multiple parallel jobs, all of which must be waited for. +pub fn spawn_cleanup_job(future: F) +where + F: Future> + Send + 'static, +{ + if let Err(e) = try_spawn_cleanup_job(future) { + panic!("Could not spawn cleanup job/daemon: {e:?}"); + } +} + +pub use spawn_cleanup_job as spawn_daemon; +pub use try_spawn_cleanup_job as try_spawn_daemon; diff --git a/util/src/tokio/local_key.rs b/util/src/tokio/local_key.rs new file mode 100644 index 0000000..e1c11ae --- /dev/null +++ b/util/src/tokio/local_key.rs @@ -0,0 +1,13 @@ +//! Helpers for [tokio::task::LocalKey] + +/// Extension trait for [tokio::task::LocalKey] +pub trait LocalKeyExt { + /// Check whether a tokio LocalKey is set + fn is_set(&'static self) -> bool; +} + +impl LocalKeyExt for tokio::task::LocalKey { + fn is_set(&'static self) -> bool { + self.try_with(|_| ()).is_ok() + } +} diff --git a/util/src/tokio/mod.rs b/util/src/tokio/mod.rs new file mode 100644 index 0000000..d257293 --- /dev/null +++ b/util/src/tokio/mod.rs @@ -0,0 +1,4 @@ +//! Tokio-related utilities + +pub mod janitor; +pub mod local_key; diff --git a/util/tests/janitor.rs b/util/tests/janitor.rs new file mode 100644 index 0000000..0661203 --- /dev/null +++ b/util/tests/janitor.rs @@ -0,0 +1,85 @@ +#![cfg(feature = "tokio")] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use tokio::time::sleep; + +use rosenpass_util::tokio::janitor::{enter_janitor, spawn_cleanup_job, try_spawn_daemon}; + +#[tokio::test] +async fn janitor_demo() -> anyhow::Result<()> { + let count = Arc::new(AtomicUsize::new(0)); + + // Make sure the program has access to an ambient janitor + { + let count = count.clone(); + enter_janitor(async move { + let _drop_guard = AsyncDropDemo::new(count.clone()).await; + + // Start a background job + { + let count = count.clone(); + try_spawn_daemon(async move { + for _ in 0..17 { + count.fetch_add(1, Ordering::Relaxed); + sleep(Duration::from_micros(200)).await; + } + Ok(()) + })?; + } + + // Start another + { + let count = count.clone(); + try_spawn_daemon(async move { + for _ in 0..6 { + count.fetch_add(100, Ordering::Relaxed); + sleep(Duration::from_micros(800)).await; + } + Ok(()) + })?; + } + + // Note how this function just starts a couple background jobs, but exits immediately + + anyhow::Ok(()) + }) + } + .await; + + // At this point, all background jobs have finished, now we can check the result of all our + // additions + assert_eq!(count.load(Ordering::Acquire), 41617); + + Ok(()) +} + +/// Demo of how janitor can be used to implement async destructors +struct AsyncDropDemo { + count: Arc, +} + +impl AsyncDropDemo { + async fn new(count: Arc) -> Self { + count.fetch_add(1000, Ordering::Relaxed); + sleep(Duration::from_micros(50)).await; + AsyncDropDemo { count } + } +} + +impl Drop for AsyncDropDemo { + fn drop(&mut self) { + let count = self.count.clone(); + // This necessarily uses the panicking variant; + // we use spawn_cleanup_job because this makes more semantic sense in this context + spawn_cleanup_job(async move { + for _ in 0..4 { + count.fetch_add(10000, Ordering::Relaxed); + sleep(Duration::from_micros(800)).await; + } + Ok(()) + }) + } +}