wasmtime/runtime/component/concurrent.rs
1use {
2 crate::AsContextMut,
3 anyhow::Result,
4 futures::{stream::FuturesUnordered, FutureExt},
5 std::{boxed::Box, future::Future, pin::Pin},
6};
7
8pub use futures_and_streams::{ErrorContext, FutureReader, StreamReader};
9
10mod futures_and_streams;
11
12/// Represents the result of a concurrent operation.
13///
14/// This is similar to a [`std::future::Future`] except that it represents an
15/// operation which requires exclusive access to a store in order to make
16/// progress -- without monopolizing that store for the lifetime of the
17/// operation.
18pub struct Promise<T>(Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>);
19
20impl<T: 'static> Promise<T> {
21 /// Map the result of this `Promise` from one value to another.
22 pub fn map<U>(self, fun: impl FnOnce(T) -> U + Send + Sync + 'static) -> Promise<U> {
23 Promise(Box::pin(self.0.map(fun)))
24 }
25
26 /// Convert this `Promise` to a future which may be `await`ed for its
27 /// result.
28 ///
29 /// The returned future will require exclusive use of the store until it
30 /// completes. If you need to await more than one `Promise` concurrently,
31 /// use [`PromisesUnordered`].
32 pub async fn get<U: Send>(self, store: impl AsContextMut<Data = U>) -> Result<T> {
33 _ = store;
34 todo!()
35 }
36
37 /// Convert this `Promise` to a future which may be `await`ed for its
38 /// result.
39 ///
40 /// Unlike [`Self::get`], this does _not_ take a store parameter, meaning
41 /// the returned future will not make progress until and unless the event
42 /// loop for the store it came from is polled. Thus, this method should
43 /// only be used from within host functions and not from top-level embedder
44 /// code.
45 pub fn into_future(self) -> Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>> {
46 self.0
47 }
48}
49
50/// Represents a collection of zero or more concurrent operations.
51///
52/// Similar to [`futures::stream::FuturesUnordered`], this type supports
53/// `await`ing more than one [`Promise`]s concurrently.
54pub struct PromisesUnordered<T>(
55 FuturesUnordered<Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>>,
56);
57
58impl<T: 'static> PromisesUnordered<T> {
59 /// Create a new `PromisesUnordered` with no entries.
60 pub fn new() -> Self {
61 Self(FuturesUnordered::new())
62 }
63
64 /// Add the specified [`Promise`] to this collection.
65 pub fn push(&mut self, promise: Promise<T>) {
66 self.0.push(promise.0)
67 }
68
69 /// Get the next result from this collection, if any.
70 pub async fn next<U: Send>(&mut self, store: impl AsContextMut<Data = U>) -> Result<Option<T>> {
71 _ = store;
72 todo!()
73 }
74}