wasmtime/runtime/component/concurrent/func.rs
1use crate::component::concurrent::TaskId;
2use crate::component::concurrent::{self, GuestTaskId, PreparedCall};
3use crate::component::func::LowerContext;
4use crate::component::{AsAccessor, ComponentNamedList, Func, Lift, Lower, TypedFunc, Val};
5use crate::prelude::*;
6use crate::runtime::vm::SendSyncPtr;
7use crate::{AsContextMut, StoreContextMut, ValRaw};
8use core::marker;
9use core::mem::MaybeUninit;
10use core::ptr::NonNull;
11use wasmtime_environ::component::{InterfaceType, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS};
12
13/// Returned from [`Func::start_call_concurrent`] to represent a
14/// pending-but-not-yet-resolved call into wasm.
15pub struct FuncCallConcurrent<'a, T> {
16 call: concurrent::QueuedCall<Vec<Val>>,
17 results: &'a mut [Val],
18 _marker: marker::PhantomData<fn(T)>,
19}
20
21impl Func {
22 /// Start a concurrent call to this function.
23 ///
24 /// Concurrency is achieved by relying on the [`Accessor`] argument, which
25 /// can be obtained by calling [`StoreContextMut::run_concurrent`].
26 ///
27 /// Unlike [`Self::call`] and [`Self::call_async`] (both of which require
28 /// exclusive access to the store until the completion of the call), calls
29 /// made using this method may run concurrently with other calls to the same
30 /// instance. In addition, the runtime will call the `post-return` function
31 /// (if any) automatically when the guest task completes.
32 ///
33 /// # Progress
34 ///
35 /// For the wasm task being created in `call_concurrent` to make progress it
36 /// must be run within the scope of [`run_concurrent`]. If there are no
37 /// active calls to [`run_concurrent`] then the wasm task will appear as
38 /// stalled. This is typically not a concern as an [`Accessor`] is bound
39 /// by default to a scope of [`run_concurrent`].
40 ///
41 /// One situation in which this can arise, for example, is that if a
42 /// [`run_concurrent`] computation finishes its async closure before all
43 /// wasm tasks have completed, then there will be no scope of
44 /// [`run_concurrent`] anywhere. In this situation the wasm tasks that have
45 /// not yet completed will not make progress until [`run_concurrent`] is
46 /// called again.
47 ///
48 /// Embedders will need to ensure that this future is `await`'d within the
49 /// scope of [`run_concurrent`] to ensure that the value can be produced
50 /// during the `await` call.
51 ///
52 /// # Cancellation
53 ///
54 /// Cancelling an async task created via `call_concurrent`, at this time, is
55 /// only possible by dropping the store that the computation runs within.
56 /// With [#11833] implemented then it will be possible to request
57 /// cancellation of a task, but that is not yet implemented. Hard-cancelling
58 /// a task will only ever be possible by dropping the entire store and it is
59 /// not possible to remove just one task from a store.
60 ///
61 /// This async function behaves more like a "spawn" than a normal Rust async
62 /// function. When this function is invoked then metadata for the function
63 /// call is recorded in the store connected to the `accessor` argument and
64 /// the wasm invocation is from then on connected to the store. If the
65 /// future created by this function is dropped it does not cancel the
66 /// in-progress execution of the wasm task. Dropping the future
67 /// relinquishes the host's ability to learn about the result of the task
68 /// but the task will still progress and invoke callbacks and such until
69 /// completion.
70 ///
71 /// This function will return an error if [`Config::concurrency_support`] is
72 /// disabled.
73 ///
74 /// [`Config::concurrency_support`]: crate::Config::concurrency_support
75 /// [`run_concurrent`]: crate::Store::run_concurrent
76 /// [#11833]: https://github.com/bytecodealliance/wasmtime/issues/11833
77 /// [`Accessor`]: crate::component::Accessor
78 ///
79 /// # Panics
80 ///
81 /// Panics if the store that the [`Accessor`] is derived from does not own
82 /// this function.
83 ///
84 /// # Example
85 ///
86 /// Using [`StoreContextMut::run_concurrent`] to get an [`Accessor`]:
87 ///
88 /// ```
89 /// # use {
90 /// # wasmtime::{
91 /// # error::{Result},
92 /// # component::{Component, Linker, ResourceTable},
93 /// # Config, Engine, Store
94 /// # },
95 /// # };
96 /// #
97 /// # struct Ctx { table: ResourceTable }
98 /// #
99 /// # async fn foo() -> Result<()> {
100 /// # let mut config = Config::new();
101 /// # let engine = Engine::new(&config)?;
102 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
103 /// # let mut linker = Linker::new(&engine);
104 /// # let component = Component::new(&engine, "")?;
105 /// # let instance = linker.instantiate_async(&mut store, &component).await?;
106 /// let my_func = instance.get_func(&mut store, "my_func").unwrap();
107 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
108 /// my_func.call_concurrent(accessor, &[], &mut Vec::new()).await?;
109 /// Ok(())
110 /// }).await??;
111 /// # Ok(())
112 /// # }
113 /// ```
114 pub async fn call_concurrent(
115 self,
116 accessor: impl AsAccessor<Data: Send>,
117 params: &[Val],
118 results: &mut [Val],
119 ) -> Result<()> {
120 let accessor = accessor.as_accessor();
121 let call = accessor.with(|store| self.start_call_concurrent(store, params, results))?;
122 self.finish_call_concurrent(accessor, call).await
123 }
124
125 /// Performs preparatory work for invoking this function with `params`,
126 /// returning a [`FuncCallConcurrent`]
127 /// which can be passed to [`Func::finish_call_concurrent`] to resolve
128 /// the call.
129 ///
130 /// For more information see [`Func::call_concurrent`].
131 pub fn start_call_concurrent<'a, T: Send + 'static>(
132 self,
133 mut store: impl AsContextMut<Data = T>,
134 params: &'a [Val],
135 results: &'a mut [Val],
136 ) -> Result<FuncCallConcurrent<'a, T>> {
137 self.check_params_results(store.as_context_mut(), params, results)?;
138 let prepared = self.prepare_call_dynamic(store.as_context_mut(), params.to_vec())?;
139 let call = concurrent::QueuedCall::new(store.as_context_mut(), prepared)?;
140 Ok(FuncCallConcurrent {
141 call,
142 results,
143 _marker: marker::PhantomData,
144 })
145 }
146
147 /// Completes a call that was initiated via
148 /// [`Func::start_call_concurrent`].
149 pub async fn finish_call_concurrent<T: Send>(
150 self,
151 accessor: impl AsAccessor<Data = T>,
152 call: FuncCallConcurrent<'_, T>,
153 ) -> Result<()> {
154 // Intentionally not used today, but left here for future API
155 // compatibility with using this.
156 let _ = accessor;
157 let FuncCallConcurrent { call, results, .. } = call;
158 let run_results = call.await?;
159 assert_eq!(run_results.len(), results.len());
160 for (result, slot) in run_results.into_iter().zip(results) {
161 *slot = result;
162 }
163 Ok(())
164 }
165
166 /// Calls `concurrent::prepare_call` with monomorphized functions for
167 /// lowering the parameters and lifting the result.
168 fn prepare_call_dynamic<'a, T: Send + 'static>(
169 self,
170 mut store: StoreContextMut<'a, T>,
171 params: Vec<Val>,
172 ) -> Result<PreparedCall<Vec<Val>>> {
173 let store = store.as_context_mut();
174
175 concurrent::prepare_call(
176 store,
177 self,
178 MAX_FLAT_PARAMS,
179 false,
180 move |func, store, params_out| {
181 func.with_lower_context(store, |cx, ty| {
182 Self::lower_args(cx, ¶ms, ty, params_out)
183 })
184 },
185 move |func, store, results| {
186 let max_flat = if func.abi_async(store) {
187 MAX_FLAT_PARAMS
188 } else {
189 MAX_FLAT_RESULTS
190 };
191 let results = func.with_lift_context(store, |cx, ty| {
192 Self::lift_results(cx, ty, results, max_flat)?.collect::<Result<Vec<_>>>()
193 })?;
194 Ok(Box::new(results))
195 },
196 )
197 }
198}
199
200impl<T> FuncCallConcurrent<'_, T> {
201 /// Returns the task that this invocation corresponds to.
202 ///
203 /// This can be later correlated with [`StoreContextMut::async_call_stack`]
204 /// for example.
205 pub fn task(&self) -> GuestTaskId {
206 self.call.task()
207 }
208}
209
210/// Returned from [`TypedFunc::start_call_concurrent`] to represent a
211/// pending-but-not-yet-resolved call into wasm.
212pub struct TypedFuncCallConcurrent<T, P, R> {
213 call: concurrent::QueuedCall<R>,
214 _marker: marker::PhantomData<fn(T, P)>,
215}
216
217impl<Params, Return> TypedFunc<Params, Return>
218where
219 Params: ComponentNamedList + Lower,
220 Return: ComponentNamedList + Lift,
221{
222 pub(crate) async fn call_async_concurrent(
223 &self,
224 mut store: impl AsContextMut<Data: Send>,
225 params: Params,
226 ) -> Result<Return>
227 where
228 Return: 'static,
229 {
230 let mut store = store.as_context_mut();
231 let ptr = SendSyncPtr::from(NonNull::from(¶ms).cast::<u8>());
232 let prepared = self.prepare_call(store.as_context_mut(), true, move |cx, ty, dst| {
233 // SAFETY: The goal here is to get `Params`, a non-`'static`
234 // value, to live long enough to the lowering of the
235 // parameters. We're guaranteed that `Params` lives in the
236 // future of the outer function (we're in an `async fn`) so it'll
237 // stay alive as long as the future itself. That is distinct,
238 // for example, from the signature of `call_concurrent` below.
239 //
240 // Here a pointer to `Params` is smuggled to this location
241 // through a `SendSyncPtr<u8>` to thwart the `'static` check
242 // of rustc and the signature of `prepare_call`.
243 //
244 // Note the use of `SignalOnDrop` in the code that follows
245 // this closure, which ensures that the task will be removed
246 // from the concurrent state to which it belongs when the
247 // containing `Future` is dropped, so long as the parameters
248 // have not yet been lowered. Since this closure is removed from
249 // the task after the parameters are lowered, it will never be called
250 // after the containing `Future` is dropped.
251 let params = unsafe { ptr.cast::<Params>().as_ref() };
252 Self::lower_args(cx, ty, dst, params)
253 })?;
254
255 struct SignalOnDrop<'a, T: 'static> {
256 store: StoreContextMut<'a, T>,
257 task: TaskId,
258 }
259
260 impl<'a, T> Drop for SignalOnDrop<'a, T> {
261 fn drop(&mut self) {
262 self.task.host_future_dropped(self.store.0).unwrap();
263 }
264 }
265
266 let mut wrapper = SignalOnDrop {
267 store,
268 task: prepared.task_id(),
269 };
270
271 let result = concurrent::QueuedCall::new(wrapper.store.as_context_mut(), prepared)?;
272 wrapper
273 .store
274 .as_context_mut()
275 .run_concurrent_trap_on_idle(async |_| Ok(result.await?))
276 .await?
277 }
278
279 /// Start a concurrent call to this function.
280 ///
281 /// Concurrency is achieved by relying on the [`Accessor`] argument, which
282 /// can be obtained by calling [`StoreContextMut::run_concurrent`].
283 ///
284 /// Unlike [`Self::call`] and [`Self::call_async`] (both of which require
285 /// exclusive access to the store until the completion of the call), calls
286 /// made using this method may run concurrently with other calls to the same
287 /// instance. In addition, the runtime will call the `post-return` function
288 /// (if any) automatically when the guest task completes.
289 ///
290 /// This function will return an error if [`Config::concurrency_support`] is
291 /// disabled.
292 ///
293 /// [`Config::concurrency_support`]: crate::Config::concurrency_support
294 ///
295 /// # Progress and Cancellation
296 ///
297 /// For more information about how to make progress on the wasm task or how
298 /// to cancel the wasm task see the documentation for
299 /// [`Func::call_concurrent`].
300 ///
301 /// [`Func::call_concurrent`]: crate::component::Func::call_concurrent
302 ///
303 /// # Panics
304 ///
305 /// Panics if the store that the [`Accessor`] is derived from does not own
306 /// this function.
307 ///
308 /// [`Accessor`]: crate::component::Accessor
309 ///
310 /// # Example
311 ///
312 /// Using [`StoreContextMut::run_concurrent`] to get an [`Accessor`]:
313 ///
314 /// ```
315 /// # use {
316 /// # wasmtime::{
317 /// # error::{Result},
318 /// # component::{Component, Linker, ResourceTable},
319 /// # Config, Engine, Store
320 /// # },
321 /// # };
322 /// #
323 /// # struct Ctx { table: ResourceTable }
324 /// #
325 /// # async fn foo() -> Result<()> {
326 /// # let mut config = Config::new();
327 /// # let engine = Engine::new(&config)?;
328 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
329 /// # let mut linker = Linker::new(&engine);
330 /// # let component = Component::new(&engine, "")?;
331 /// # let instance = linker.instantiate_async(&mut store, &component).await?;
332 /// let my_typed_func = instance.get_typed_func::<(), ()>(&mut store, "my_typed_func")?;
333 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
334 /// my_typed_func.call_concurrent(accessor, ()).await?;
335 /// Ok(())
336 /// }).await??;
337 /// # Ok(())
338 /// # }
339 /// ```
340 pub async fn call_concurrent(
341 self,
342 accessor: impl AsAccessor<Data: Send>,
343 params: Params,
344 ) -> Result<Return>
345 where
346 Params: 'static,
347 Return: 'static,
348 {
349 let call = accessor
350 .as_accessor()
351 .with(|store| self.start_call_concurrent(store, params))?;
352 self.finish_call_concurrent(accessor, call).await
353 }
354
355 /// Performs preparatory work for invoking this function with `params`,
356 /// returning a [`TypedFuncCallConcurrent`]
357 /// which can be passed to [`TypedFunc::finish_call_concurrent`] to resolve
358 /// the call.
359 ///
360 /// For more information see [`TypedFunc::call_concurrent`].
361 pub fn start_call_concurrent<T>(
362 self,
363 mut store: impl AsContextMut<Data = T>,
364 params: Params,
365 ) -> Result<TypedFuncCallConcurrent<T, Params, Return>>
366 where
367 T: Send + 'static,
368 Params: 'static,
369 Return: 'static,
370 {
371 let mut store = store.as_context_mut();
372 let mut store = store.as_context_mut();
373 ensure!(
374 store.0.concurrency_support(),
375 "cannot use `call_concurrent` Config::concurrency_support disabled",
376 );
377
378 let prepared = self.prepare_call(store.as_context_mut(), false, move |cx, ty, dst| {
379 Self::lower_args(cx, ty, dst, ¶ms)
380 })?;
381 let call = concurrent::QueuedCall::new(store, prepared)?;
382 Ok(TypedFuncCallConcurrent {
383 call,
384 _marker: marker::PhantomData,
385 })
386 }
387
388 /// Completes a call that was initiated via
389 /// [`TypedFunc::start_call_concurrent`].
390 pub async fn finish_call_concurrent<T>(
391 self,
392 accessor: impl AsAccessor<Data = T>,
393 call: TypedFuncCallConcurrent<T, Params, Return>,
394 ) -> Result<Return>
395 where
396 T: Send + 'static,
397 Params: 'static,
398 Return: 'static,
399 {
400 // This is intentionally part of the public API but not used yet.
401 // This'll likely want to be used in future refactorings.
402 let _ = accessor;
403 call.call.await
404 }
405
406 /// Calls `concurrent::prepare_call` with monomorphized functions for
407 /// lowering the parameters and lifting the result according to the number
408 /// of core Wasm parameters and results in the signature of the function to
409 /// be called.
410 fn prepare_call<T>(
411 self,
412 store: StoreContextMut<'_, T>,
413 host_future_present: bool,
414 lower: impl FnOnce(
415 &mut LowerContext<T>,
416 InterfaceType,
417 &mut [MaybeUninit<ValRaw>],
418 ) -> Result<()>
419 + Send
420 + Sync
421 + 'static,
422 ) -> Result<PreparedCall<Return>>
423 where
424 Return: 'static,
425 {
426 use crate::component::storage::slice_to_storage;
427 debug_assert!(store.0.concurrency_support());
428
429 let param_count = if Params::flatten_count() <= MAX_FLAT_PARAMS {
430 Params::flatten_count()
431 } else {
432 1
433 };
434 let max_results = if self.func().abi_async(store.0) {
435 MAX_FLAT_PARAMS
436 } else {
437 MAX_FLAT_RESULTS
438 };
439 concurrent::prepare_call(
440 store,
441 *self.func(),
442 param_count,
443 host_future_present,
444 move |func, store, params_out| {
445 func.with_lower_context(store, |cx, ty| lower(cx, ty, params_out))
446 },
447 move |func, store, results| {
448 let result = if Return::flatten_count() <= max_results {
449 func.with_lift_context(store, |cx, ty| {
450 // SAFETY: Per the safety requirements documented for the
451 // `ComponentType` trait, `Return::Lower` must be
452 // compatible at the binary level with a `[ValRaw; N]`,
453 // where `N` is `mem::size_of::<Return::Lower>() /
454 // mem::size_of::<ValRaw>()`. And since this function
455 // is only used when `Return::flatten_count() <=
456 // MAX_FLAT_RESULTS` and `MAX_FLAT_RESULTS == 1`, `N`
457 // can only either be 0 or 1.
458 //
459 // See `ComponentInstance::exit_call` for where we use
460 // the result count passed from
461 // `wasmtime_environ::fact::trampoline`-generated code
462 // to ensure the slice has the correct length, and also
463 // `concurrent::start_call` for where we conservatively
464 // use a slice length of 1 unconditionally. Also note
465 // that, as of this writing `slice_to_storage`
466 // double-checks the slice length is sufficient.
467 let results: &Return::Lower = unsafe { slice_to_storage(results) };
468 Self::lift_stack_result(cx, ty, results)
469 })?
470 } else {
471 func.with_lift_context(store, |cx, ty| {
472 Self::lift_heap_result(cx, ty, &results[0])
473 })?
474 };
475 Ok(Box::new(result))
476 },
477 )
478 }
479}
480
481impl<T, P, R> TypedFuncCallConcurrent<T, P, R> {
482 /// Returns the task that this invocation corresponds to.
483 ///
484 /// This can be later correlated with [`StoreContextMut::async_call_stack`]
485 /// for example.
486 pub fn task(&self) -> GuestTaskId {
487 self.call.task()
488 }
489}