wasmtime/runtime/component/
concurrent.rs

1use {
2    crate::AsContextMut,
3    anyhow::Result,
4    futures::{stream::FuturesUnordered, FutureExt},
5    std::{boxed::Box, future::Future, pin::Pin},
6};
7
8pub use futures_and_streams::{ErrorContext, FutureReader, StreamReader};
9
10mod futures_and_streams;
11
12/// Represents the result of a concurrent operation.
13///
14/// This is similar to a [`std::future::Future`] except that it represents an
15/// operation which requires exclusive access to a store in order to make
16/// progress -- without monopolizing that store for the lifetime of the
17/// operation.
18pub struct Promise<T>(Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>);
19
20impl<T: 'static> Promise<T> {
21    /// Map the result of this `Promise` from one value to another.
22    pub fn map<U>(self, fun: impl FnOnce(T) -> U + Send + Sync + 'static) -> Promise<U> {
23        Promise(Box::pin(self.0.map(fun)))
24    }
25
26    /// Convert this `Promise` to a future which may be `await`ed for its
27    /// result.
28    ///
29    /// The returned future will require exclusive use of the store until it
30    /// completes.  If you need to await more than one `Promise` concurrently,
31    /// use [`PromisesUnordered`].
32    pub async fn get<U: Send>(self, store: impl AsContextMut<Data = U>) -> Result<T> {
33        _ = store;
34        todo!()
35    }
36
37    /// Convert this `Promise` to a future which may be `await`ed for its
38    /// result.
39    ///
40    /// Unlike [`Self::get`], this does _not_ take a store parameter, meaning
41    /// the returned future will not make progress until and unless the event
42    /// loop for the store it came from is polled.  Thus, this method should
43    /// only be used from within host functions and not from top-level embedder
44    /// code.
45    pub fn into_future(self) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> {
46        self.0
47    }
48}
49
50/// Represents a collection of zero or more concurrent operations.
51///
52/// Similar to [`futures::stream::FuturesUnordered`], this type supports
53/// `await`ing more than one [`Promise`]s concurrently.
54pub struct PromisesUnordered<T>(
55    FuturesUnordered<Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>>,
56);
57
58impl<T: 'static> PromisesUnordered<T> {
59    /// Create a new `PromisesUnordered` with no entries.
60    pub fn new() -> Self {
61        Self(FuturesUnordered::new())
62    }
63
64    /// Add the specified [`Promise`] to this collection.
65    pub fn push(&mut self, promise: Promise<T>) {
66        self.0.push(promise.0)
67    }
68
69    /// Get the next result from this collection, if any.
70    pub async fn next<U: Send>(&mut self, store: impl AsContextMut<Data = U>) -> Result<Option<T>> {
71        _ = store;
72        todo!()
73    }
74}