wasmtime/runtime/component/concurrent/
abort.rs

1use std::future;
2use std::pin::pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll, Waker};
5
6/// Handle to a task which may be used to abort it.
7///
8/// This represents a handle to a running task which can be cancelled with
9/// [`AbortHandle::abort`].
10pub struct AbortHandle {
11    state: Arc<Mutex<AbortState>>,
12}
13
14#[derive(Default)]
15struct AbortState {
16    aborted: bool,
17    waker: Option<Waker>,
18}
19
20impl AbortHandle {
21    /// Abort the task.
22    ///
23    /// This flags the connected task should abort in the near future, but note
24    /// that if this is called while the future is being polled then that call
25    /// will still complete.
26    pub fn abort(&self) {
27        let waker = {
28            let mut state = self.state.lock().unwrap();
29            state.aborted = true;
30            state.waker.take()
31        };
32        if let Some(waker) = waker {
33            waker.wake();
34        }
35    }
36
37    fn is_aborted(&self, cx: &mut Context<'_>) -> bool {
38        let mut state = self.state.lock().unwrap();
39        if state.aborted {
40            return true;
41        }
42        state.waker = Some(cx.waker().clone());
43        false
44    }
45
46    /// Wraps the `future` provided in a new future which is "abortable" where
47    /// if the returned `AbortHandle` is flagged then the future will resolve
48    /// ASAP with `None` and drop the provided `future`.
49    pub(crate) fn run<F>(future: F) -> (AbortHandle, impl Future<Output = Option<F::Output>>)
50    where
51        F: Future,
52    {
53        let handle = AbortHandle {
54            state: Default::default(),
55        };
56        let handle2 = AbortHandle {
57            state: handle.state.clone(),
58        };
59        let future = async move {
60            let mut future = pin!(future);
61            future::poll_fn(|cx| {
62                if handle2.is_aborted(cx) {
63                    return Poll::Ready(None);
64                }
65                future.as_mut().poll(cx).map(Some)
66            })
67            .await
68        };
69        (handle, future)
70    }
71}