Skip to main content

wasmtime/runtime/component/concurrent/
func.rs

1use crate::component::concurrent::TaskId;
2use crate::component::concurrent::{self, GuestTaskId, PreparedCall};
3use crate::component::func::LowerContext;
4use crate::component::{AsAccessor, ComponentNamedList, Func, Lift, Lower, TypedFunc, Val};
5use crate::prelude::*;
6use crate::runtime::vm::SendSyncPtr;
7use crate::{AsContextMut, StoreContextMut, ValRaw};
8use core::marker;
9use core::mem::MaybeUninit;
10use core::ptr::NonNull;
11use wasmtime_environ::component::{InterfaceType, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS};
12
13/// Returned from [`Func::start_call_concurrent`] to represent a
14/// pending-but-not-yet-resolved call into wasm.
15pub struct FuncCallConcurrent<'a, T> {
16    call: concurrent::QueuedCall<Vec<Val>>,
17    results: &'a mut [Val],
18    _marker: marker::PhantomData<fn(T)>,
19}
20
21impl Func {
22    /// Start a concurrent call to this function.
23    ///
24    /// Concurrency is achieved by relying on the [`Accessor`] argument, which
25    /// can be obtained by calling [`StoreContextMut::run_concurrent`].
26    ///
27    /// Unlike [`Self::call`] and [`Self::call_async`] (both of which require
28    /// exclusive access to the store until the completion of the call), calls
29    /// made using this method may run concurrently with other calls to the same
30    /// instance.  In addition, the runtime will call the `post-return` function
31    /// (if any) automatically when the guest task completes.
32    ///
33    /// # Progress
34    ///
35    /// For the wasm task being created in `call_concurrent` to make progress it
36    /// must be run within the scope of [`run_concurrent`]. If there are no
37    /// active calls to [`run_concurrent`] then the wasm task will appear as
38    /// stalled. This is typically not a concern as an [`Accessor`] is bound
39    /// by default to a scope of [`run_concurrent`].
40    ///
41    /// One situation in which this can arise, for example, is that if a
42    /// [`run_concurrent`] computation finishes its async closure before all
43    /// wasm tasks have completed, then there will be no scope of
44    /// [`run_concurrent`] anywhere. In this situation the wasm tasks that have
45    /// not yet completed will not make progress until [`run_concurrent`] is
46    /// called again.
47    ///
48    /// Embedders will need to ensure that this future is `await`'d within the
49    /// scope of [`run_concurrent`] to ensure that the value can be produced
50    /// during the `await` call.
51    ///
52    /// # Cancellation
53    ///
54    /// Cancelling an async task created via `call_concurrent`, at this time, is
55    /// only possible by dropping the store that the computation runs within.
56    /// With [#11833] implemented then it will be possible to request
57    /// cancellation of a task, but that is not yet implemented. Hard-cancelling
58    /// a task will only ever be possible by dropping the entire store and it is
59    /// not possible to remove just one task from a store.
60    ///
61    /// This async function behaves more like a "spawn" than a normal Rust async
62    /// function. When this function is invoked then metadata for the function
63    /// call is recorded in the store connected to the `accessor` argument and
64    /// the wasm invocation is from then on connected to the store. If the
65    /// future created by this function is dropped it does not cancel the
66    /// in-progress execution of the wasm task. Dropping the future
67    /// relinquishes the host's ability to learn about the result of the task
68    /// but the task will still progress and invoke callbacks and such until
69    /// completion.
70    ///
71    /// This function will return an error if [`Config::concurrency_support`] is
72    /// disabled.
73    ///
74    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
75    /// [`run_concurrent`]: crate::Store::run_concurrent
76    /// [#11833]: https://github.com/bytecodealliance/wasmtime/issues/11833
77    /// [`Accessor`]: crate::component::Accessor
78    ///
79    /// # Panics
80    ///
81    /// Panics if the store that the [`Accessor`] is derived from does not own
82    /// this function.
83    ///
84    /// # Example
85    ///
86    /// Using [`StoreContextMut::run_concurrent`] to get an [`Accessor`]:
87    ///
88    /// ```
89    /// # use {
90    /// #   wasmtime::{
91    /// #     error::{Result},
92    /// #     component::{Component, Linker, ResourceTable},
93    /// #     Config, Engine, Store
94    /// #   },
95    /// # };
96    /// #
97    /// # struct Ctx { table: ResourceTable }
98    /// #
99    /// # async fn foo() -> Result<()> {
100    /// # let mut config = Config::new();
101    /// # let engine = Engine::new(&config)?;
102    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
103    /// # let mut linker = Linker::new(&engine);
104    /// # let component = Component::new(&engine, "")?;
105    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
106    /// let my_func = instance.get_func(&mut store, "my_func").unwrap();
107    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
108    ///    my_func.call_concurrent(accessor, &[], &mut Vec::new()).await?;
109    ///    Ok(())
110    /// }).await??;
111    /// # Ok(())
112    /// # }
113    /// ```
114    pub async fn call_concurrent(
115        self,
116        accessor: impl AsAccessor<Data: Send>,
117        params: &[Val],
118        results: &mut [Val],
119    ) -> Result<()> {
120        let accessor = accessor.as_accessor();
121        let call = accessor.with(|store| self.start_call_concurrent(store, params, results))?;
122        self.finish_call_concurrent(accessor, call).await
123    }
124
125    /// Performs preparatory work for invoking this function with `params`,
126    /// returning a [`FuncCallConcurrent`]
127    /// which can be passed to [`Func::finish_call_concurrent`] to resolve
128    /// the call.
129    ///
130    /// For more information see [`Func::call_concurrent`].
131    pub fn start_call_concurrent<'a, T: Send + 'static>(
132        self,
133        mut store: impl AsContextMut<Data = T>,
134        params: &'a [Val],
135        results: &'a mut [Val],
136    ) -> Result<FuncCallConcurrent<'a, T>> {
137        self.check_params_results(store.as_context_mut(), params, results)?;
138        let prepared = self.prepare_call_dynamic(store.as_context_mut(), params.to_vec())?;
139        let call = concurrent::QueuedCall::new(store.as_context_mut(), prepared)?;
140        Ok(FuncCallConcurrent {
141            call,
142            results,
143            _marker: marker::PhantomData,
144        })
145    }
146
147    /// Completes a call that was initiated via
148    /// [`Func::start_call_concurrent`].
149    pub async fn finish_call_concurrent<T: Send>(
150        self,
151        accessor: impl AsAccessor<Data = T>,
152        call: FuncCallConcurrent<'_, T>,
153    ) -> Result<()> {
154        // Intentionally not used today, but left here for future API
155        // compatibility with using this.
156        let _ = accessor;
157        let FuncCallConcurrent { call, results, .. } = call;
158        let run_results = call.await?;
159        assert_eq!(run_results.len(), results.len());
160        for (result, slot) in run_results.into_iter().zip(results) {
161            *slot = result;
162        }
163        Ok(())
164    }
165
166    /// Calls `concurrent::prepare_call` with monomorphized functions for
167    /// lowering the parameters and lifting the result.
168    fn prepare_call_dynamic<'a, T: Send + 'static>(
169        self,
170        mut store: StoreContextMut<'a, T>,
171        params: Vec<Val>,
172    ) -> Result<PreparedCall<Vec<Val>>> {
173        let store = store.as_context_mut();
174
175        concurrent::prepare_call(
176            store,
177            self,
178            MAX_FLAT_PARAMS,
179            false,
180            move |func, store, params_out| {
181                func.with_lower_context(store, |cx, ty| {
182                    Self::lower_args(cx, &params, ty, params_out)
183                })
184            },
185            move |func, store, results| {
186                let max_flat = if func.abi_async(store) {
187                    MAX_FLAT_PARAMS
188                } else {
189                    MAX_FLAT_RESULTS
190                };
191                let results = func.with_lift_context(store, |cx, ty| {
192                    Self::lift_results(cx, ty, results, max_flat)?.collect::<Result<Vec<_>>>()
193                })?;
194                Ok(Box::new(results))
195            },
196        )
197    }
198}
199
200impl<T> FuncCallConcurrent<'_, T> {
201    /// Returns the task that this invocation corresponds to.
202    ///
203    /// This can be later correlated with [`StoreContextMut::async_call_stack`]
204    /// for example.
205    pub fn task(&self) -> GuestTaskId {
206        self.call.task()
207    }
208}
209
210/// Returned from [`TypedFunc::start_call_concurrent`] to represent a
211/// pending-but-not-yet-resolved call into wasm.
212pub struct TypedFuncCallConcurrent<T, P, R> {
213    call: concurrent::QueuedCall<R>,
214    _marker: marker::PhantomData<fn(T, P)>,
215}
216
217impl<Params, Return> TypedFunc<Params, Return>
218where
219    Params: ComponentNamedList + Lower,
220    Return: ComponentNamedList + Lift,
221{
222    pub(crate) async fn call_async_concurrent(
223        &self,
224        mut store: impl AsContextMut<Data: Send>,
225        params: Params,
226    ) -> Result<Return>
227    where
228        Return: 'static,
229    {
230        let mut store = store.as_context_mut();
231        let ptr = SendSyncPtr::from(NonNull::from(&params).cast::<u8>());
232        let prepared = self.prepare_call(store.as_context_mut(), true, move |cx, ty, dst| {
233            // SAFETY: The goal here is to get `Params`, a non-`'static`
234            // value, to live long enough to the lowering of the
235            // parameters. We're guaranteed that `Params` lives in the
236            // future of the outer function (we're in an `async fn`) so it'll
237            // stay alive as long as the future itself. That is distinct,
238            // for example, from the signature of `call_concurrent` below.
239            //
240            // Here a pointer to `Params` is smuggled to this location
241            // through a `SendSyncPtr<u8>` to thwart the `'static` check
242            // of rustc and the signature of `prepare_call`.
243            //
244            // Note the use of `SignalOnDrop` in the code that follows
245            // this closure, which ensures that the task will be removed
246            // from the concurrent state to which it belongs when the
247            // containing `Future` is dropped, so long as the parameters
248            // have not yet been lowered. Since this closure is removed from
249            // the task after the parameters are lowered, it will never be called
250            // after the containing `Future` is dropped.
251            let params = unsafe { ptr.cast::<Params>().as_ref() };
252            Self::lower_args(cx, ty, dst, params)
253        })?;
254
255        struct SignalOnDrop<'a, T: 'static> {
256            store: StoreContextMut<'a, T>,
257            task: TaskId,
258        }
259
260        impl<'a, T> Drop for SignalOnDrop<'a, T> {
261            fn drop(&mut self) {
262                self.task.host_future_dropped(self.store.0).unwrap();
263            }
264        }
265
266        let mut wrapper = SignalOnDrop {
267            store,
268            task: prepared.task_id(),
269        };
270
271        let result = concurrent::QueuedCall::new(wrapper.store.as_context_mut(), prepared)?;
272        wrapper
273            .store
274            .as_context_mut()
275            .run_concurrent_trap_on_idle(async |_| Ok(result.await?))
276            .await?
277    }
278
279    /// Start a concurrent call to this function.
280    ///
281    /// Concurrency is achieved by relying on the [`Accessor`] argument, which
282    /// can be obtained by calling [`StoreContextMut::run_concurrent`].
283    ///
284    /// Unlike [`Self::call`] and [`Self::call_async`] (both of which require
285    /// exclusive access to the store until the completion of the call), calls
286    /// made using this method may run concurrently with other calls to the same
287    /// instance.  In addition, the runtime will call the `post-return` function
288    /// (if any) automatically when the guest task completes.
289    ///
290    /// This function will return an error if [`Config::concurrency_support`] is
291    /// disabled.
292    ///
293    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
294    ///
295    /// # Progress and Cancellation
296    ///
297    /// For more information about how to make progress on the wasm task or how
298    /// to cancel the wasm task see the documentation for
299    /// [`Func::call_concurrent`].
300    ///
301    /// [`Func::call_concurrent`]: crate::component::Func::call_concurrent
302    ///
303    /// # Panics
304    ///
305    /// Panics if the store that the [`Accessor`] is derived from does not own
306    /// this function.
307    ///
308    /// [`Accessor`]: crate::component::Accessor
309    ///
310    /// # Example
311    ///
312    /// Using [`StoreContextMut::run_concurrent`] to get an [`Accessor`]:
313    ///
314    /// ```
315    /// # use {
316    /// #   wasmtime::{
317    /// #     error::{Result},
318    /// #     component::{Component, Linker, ResourceTable},
319    /// #     Config, Engine, Store
320    /// #   },
321    /// # };
322    /// #
323    /// # struct Ctx { table: ResourceTable }
324    /// #
325    /// # async fn foo() -> Result<()> {
326    /// # let mut config = Config::new();
327    /// # let engine = Engine::new(&config)?;
328    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
329    /// # let mut linker = Linker::new(&engine);
330    /// # let component = Component::new(&engine, "")?;
331    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
332    /// let my_typed_func = instance.get_typed_func::<(), ()>(&mut store, "my_typed_func")?;
333    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
334    ///    my_typed_func.call_concurrent(accessor, ()).await?;
335    ///    Ok(())
336    /// }).await??;
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub async fn call_concurrent(
341        self,
342        accessor: impl AsAccessor<Data: Send>,
343        params: Params,
344    ) -> Result<Return>
345    where
346        Params: 'static,
347        Return: 'static,
348    {
349        let call = accessor
350            .as_accessor()
351            .with(|store| self.start_call_concurrent(store, params))?;
352        self.finish_call_concurrent(accessor, call).await
353    }
354
355    /// Performs preparatory work for invoking this function with `params`,
356    /// returning a [`TypedFuncCallConcurrent`]
357    /// which can be passed to [`TypedFunc::finish_call_concurrent`] to resolve
358    /// the call.
359    ///
360    /// For more information see [`TypedFunc::call_concurrent`].
361    pub fn start_call_concurrent<T>(
362        self,
363        mut store: impl AsContextMut<Data = T>,
364        params: Params,
365    ) -> Result<TypedFuncCallConcurrent<T, Params, Return>>
366    where
367        T: Send + 'static,
368        Params: 'static,
369        Return: 'static,
370    {
371        let mut store = store.as_context_mut();
372        let mut store = store.as_context_mut();
373        ensure!(
374            store.0.concurrency_support(),
375            "cannot use `call_concurrent` Config::concurrency_support disabled",
376        );
377
378        let prepared = self.prepare_call(store.as_context_mut(), false, move |cx, ty, dst| {
379            Self::lower_args(cx, ty, dst, &params)
380        })?;
381        let call = concurrent::QueuedCall::new(store, prepared)?;
382        Ok(TypedFuncCallConcurrent {
383            call,
384            _marker: marker::PhantomData,
385        })
386    }
387
388    /// Completes a call that was initiated via
389    /// [`TypedFunc::start_call_concurrent`].
390    pub async fn finish_call_concurrent<T>(
391        self,
392        accessor: impl AsAccessor<Data = T>,
393        call: TypedFuncCallConcurrent<T, Params, Return>,
394    ) -> Result<Return>
395    where
396        T: Send + 'static,
397        Params: 'static,
398        Return: 'static,
399    {
400        // This is intentionally part of the public API but not used yet.
401        // This'll likely want to be used in future refactorings.
402        let _ = accessor;
403        call.call.await
404    }
405
406    /// Calls `concurrent::prepare_call` with monomorphized functions for
407    /// lowering the parameters and lifting the result according to the number
408    /// of core Wasm parameters and results in the signature of the function to
409    /// be called.
410    fn prepare_call<T>(
411        self,
412        store: StoreContextMut<'_, T>,
413        host_future_present: bool,
414        lower: impl FnOnce(
415            &mut LowerContext<T>,
416            InterfaceType,
417            &mut [MaybeUninit<ValRaw>],
418        ) -> Result<()>
419        + Send
420        + Sync
421        + 'static,
422    ) -> Result<PreparedCall<Return>>
423    where
424        Return: 'static,
425    {
426        use crate::component::storage::slice_to_storage;
427        debug_assert!(store.0.concurrency_support());
428
429        let param_count = if Params::flatten_count() <= MAX_FLAT_PARAMS {
430            Params::flatten_count()
431        } else {
432            1
433        };
434        let max_results = if self.func().abi_async(store.0) {
435            MAX_FLAT_PARAMS
436        } else {
437            MAX_FLAT_RESULTS
438        };
439        concurrent::prepare_call(
440            store,
441            *self.func(),
442            param_count,
443            host_future_present,
444            move |func, store, params_out| {
445                func.with_lower_context(store, |cx, ty| lower(cx, ty, params_out))
446            },
447            move |func, store, results| {
448                let result = if Return::flatten_count() <= max_results {
449                    func.with_lift_context(store, |cx, ty| {
450                        // SAFETY: Per the safety requirements documented for the
451                        // `ComponentType` trait, `Return::Lower` must be
452                        // compatible at the binary level with a `[ValRaw; N]`,
453                        // where `N` is `mem::size_of::<Return::Lower>() /
454                        // mem::size_of::<ValRaw>()`.  And since this function
455                        // is only used when `Return::flatten_count() <=
456                        // MAX_FLAT_RESULTS` and `MAX_FLAT_RESULTS == 1`, `N`
457                        // can only either be 0 or 1.
458                        //
459                        // See `ComponentInstance::exit_call` for where we use
460                        // the result count passed from
461                        // `wasmtime_environ::fact::trampoline`-generated code
462                        // to ensure the slice has the correct length, and also
463                        // `concurrent::start_call` for where we conservatively
464                        // use a slice length of 1 unconditionally.  Also note
465                        // that, as of this writing `slice_to_storage`
466                        // double-checks the slice length is sufficient.
467                        let results: &Return::Lower = unsafe { slice_to_storage(results) };
468                        Self::lift_stack_result(cx, ty, results)
469                    })?
470                } else {
471                    func.with_lift_context(store, |cx, ty| {
472                        Self::lift_heap_result(cx, ty, &results[0])
473                    })?
474                };
475                Ok(Box::new(result))
476            },
477        )
478    }
479}
480
481impl<T, P, R> TypedFuncCallConcurrent<T, P, R> {
482    /// Returns the task that this invocation corresponds to.
483    ///
484    /// This can be later correlated with [`StoreContextMut::async_call_stack`]
485    /// for example.
486    pub fn task(&self) -> GuestTaskId {
487        self.call.task()
488    }
489}