wasmtime/runtime/component/concurrent.rs
1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics. See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create. Each
11//! `ComponentInstance` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `Instance::run_concurrent`. The `ComponentInstance::poll_until`
14//! function contains the loop itself, while the
15//! `ComponentInstance::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `Instance::run_concurrent`: Run the event loop for the specified instance,
25//! allowing any and all tasks belonging to that instance to make progress.
26//!
27//! - `Instance::spawn`: Run a background task as part of the event loop for the
28//! specified instance.
29//!
30//! - `Instance::{future,stream}`: Create a new Component Model `future` or
31//! `stream`; the read end may be passed to the guest.
32//!
33//! - `{Future,Stream}Reader::read` and `{Future,Stream}Writer::write`: read
34//! from or write to a future or stream, respectively.
35//!
36//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
37//!
38//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
39//! function with the linker. That function will take an `Accessor` as its
40//! first parameter, which provides access to the store and instance between
41//! (but not across) await points.
42//!
43//! - `Accessor::with`: Access the store, its associated data, and the current
44//! instance.
45//!
46//! - `Accessor::spawn`: Run a background task as part of the event loop for the
47//! specified instance. This is equivalent to `Instance::spawn` but more
48//! convenient to use in host functions.
49
50use crate::component::func::{self, Func, Options};
51use crate::component::{Component, ComponentInstanceId, HasData, HasSelf, Instance};
52use crate::fiber::{self, StoreFiber, StoreFiberYield};
53use crate::store::{StoreInner, StoreOpaque, StoreToken};
54use crate::vm::component::{CallContext, InstanceFlags, ResourceTables};
55use crate::vm::{SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
56use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
57use anyhow::{Context as _, Result, anyhow, bail};
58use error_contexts::{GlobalErrorContextRefCount, LocalErrorContextRefCount};
59use futures::channel::oneshot;
60use futures::future::{self, Either, FutureExt};
61use futures::stream::{FuturesUnordered, StreamExt};
62use futures_and_streams::{FlatAbi, ReturnCode, StreamFutureState, TableIndex, TransmitHandle};
63use states::StateTable;
64use std::any::Any;
65use std::borrow::ToOwned;
66use std::boxed::Box;
67use std::cell::UnsafeCell;
68use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
69use std::fmt;
70use std::future::Future;
71use std::marker::PhantomData;
72use std::mem::{self, ManuallyDrop, MaybeUninit};
73use std::ops::DerefMut;
74use std::pin::{Pin, pin};
75use std::ptr::{self, NonNull};
76use std::slice;
77use std::sync::Mutex;
78use std::task::{Context, Poll, Waker};
79use std::vec::Vec;
80use table::{Table, TableDebug, TableError, TableId};
81use wasmtime_environ::PrimaryMap;
82use wasmtime_environ::component::{
83 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
84 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
85 RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
86 TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
87 TypeTupleIndex,
88};
89
90pub use abort::AbortHandle;
91pub use futures_and_streams::{
92 ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter,
93 GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer,
94 WriteBuffer,
95};
96pub(crate) use futures_and_streams::{
97 ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
98};
99
100mod abort;
101mod error_contexts;
102mod futures_and_streams;
103mod states;
104mod table;
105pub(crate) mod tls;
106
107/// Constant defined in the Component Model spec to indicate that the async
108/// intrinsic (e.g. `future.write`) has not yet completed.
109const BLOCKED: u32 = 0xffff_ffff;
110
111/// Corresponds to `CallState` in the upstream spec.
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
113pub enum Status {
114 Starting = 0,
115 Started = 1,
116 Returned = 2,
117 StartCancelled = 3,
118 ReturnCancelled = 4,
119}
120
121impl Status {
122 /// Packs this status and the optional `waitable` provided into a 32-bit
123 /// result that the canonical ABI requires.
124 ///
125 /// The low 4 bits are reserved for the status while the upper 28 bits are
126 /// the waitable, if present.
127 pub fn pack(self, waitable: Option<u32>) -> u32 {
128 assert!(matches!(self, Status::Returned) == waitable.is_none());
129 let waitable = waitable.unwrap_or(0);
130 assert!(waitable < (1 << 28));
131 (waitable << 4) | (self as u32)
132 }
133}
134
135/// Corresponds to `EventCode` in the Component Model spec, plus related payload
136/// data.
137#[derive(Clone, Copy, Debug)]
138enum Event {
139 None,
140 Cancelled,
141 Subtask {
142 status: Status,
143 },
144 StreamRead {
145 code: ReturnCode,
146 pending: Option<(TypeStreamTableIndex, u32)>,
147 },
148 StreamWrite {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 FutureRead {
153 code: ReturnCode,
154 pending: Option<(TypeFutureTableIndex, u32)>,
155 },
156 FutureWrite {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160}
161
162impl Event {
163 /// Lower this event to core Wasm integers for delivery to the guest.
164 ///
165 /// Note that the waitable handle, if any, is assumed to be lowered
166 /// separately.
167 fn parts(self) -> (u32, u32) {
168 const EVENT_NONE: u32 = 0;
169 const EVENT_SUBTASK: u32 = 1;
170 const EVENT_STREAM_READ: u32 = 2;
171 const EVENT_STREAM_WRITE: u32 = 3;
172 const EVENT_FUTURE_READ: u32 = 4;
173 const EVENT_FUTURE_WRITE: u32 = 5;
174 const EVENT_CANCELLED: u32 = 6;
175 match self {
176 Event::None => (EVENT_NONE, 0),
177 Event::Cancelled => (EVENT_CANCELLED, 0),
178 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
179 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
180 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
181 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
182 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
183 }
184 }
185}
186
187/// Corresponds to `CallbackCode` in the spec.
188mod callback_code {
189 pub const EXIT: u32 = 0;
190 pub const YIELD: u32 = 1;
191 pub const WAIT: u32 = 2;
192 pub const POLL: u32 = 3;
193}
194
195/// A flag indicating that the callee is an async-lowered export.
196///
197/// This may be passed to the `async-start` intrinsic from a fused adapter.
198const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
199
200/// Provides access to either store data (via the `get` method) or the store
201/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
202/// instance to which the current host task belongs.
203///
204/// See [`Accessor::with`] for details.
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
206 accessor: &'a Accessor<T, D>,
207 store: StoreContextMut<'a, T>,
208}
209
210impl<'a, T, D> Access<'a, T, D>
211where
212 D: HasData + ?Sized,
213 T: 'static,
214{
215 /// Get mutable access to the store data.
216 pub fn data_mut(&mut self) -> &mut T {
217 self.store.data_mut()
218 }
219
220 /// Get mutable access to the store data.
221 pub fn get(&mut self) -> D::Data<'_> {
222 let get_data = self.accessor.get_data;
223 get_data(self.data_mut())
224 }
225
226 /// Spawn a background task.
227 ///
228 /// See [`Accessor::spawn`] for details.
229 pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
230 where
231 T: 'static,
232 {
233 self.accessor.instance.unwrap().spawn_with_accessor(
234 self.store.as_context_mut(),
235 self.accessor.clone_for_spawn(),
236 task,
237 )
238 }
239
240 /// Retrieve the component instance of the caller.
241 pub fn instance(&self) -> Instance {
242 self.accessor.instance()
243 }
244}
245
246impl<'a, T, D> AsContext for Access<'a, T, D>
247where
248 D: HasData + ?Sized,
249 T: 'static,
250{
251 type Data = T;
252
253 fn as_context(&self) -> StoreContext<'_, T> {
254 self.store.as_context()
255 }
256}
257
258impl<'a, T, D> AsContextMut for Access<'a, T, D>
259where
260 D: HasData + ?Sized,
261 T: 'static,
262{
263 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
264 self.store.as_context_mut()
265 }
266}
267
268/// Provides scoped mutable access to store data in the context of a concurrent
269/// host task future.
270///
271/// This allows multiple host task futures to execute concurrently and access
272/// the store between (but not across) `await` points.
273///
274/// # Rationale
275///
276/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
277/// `D::Data<'_>`. The problem this is solving, however, is that it does not
278/// literally store these values. The basic problem is that when a concurrent
279/// host future is being polled it has access to `&mut T` (and the whole
280/// `Store`) but when it's not being polled it does not have access to these
281/// values. This reflects how the store is only ever polling one future at a
282/// time so the store is effectively being passed between futures.
283///
284/// Rust's `Future` trait, however, has no means of passing a `Store`
285/// temporarily between futures. The [`Context`](std::task::Context) type does
286/// not have the ability to attach arbitrary information to it at this time.
287/// This type, [`Accessor`], is used to bridge this expressivity gap.
288///
289/// The [`Accessor`] type here represents the ability to acquire, temporarily in
290/// a synchronous manner, the current store. The [`Accessor::with`] function
291/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
292/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
293/// not take an `async` closure as its argument, instead it's a synchronous
294/// closure which must complete during on run of `Future::poll`. This reflects
295/// how the store is temporarily made available while a host future is being
296/// polled.
297///
298/// # Implementation
299///
300/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
301/// this type additionally doesn't even have a lifetime parameter. This is
302/// instead a representation of proof of the ability to acquire these while a
303/// future is being polled. Wasmtime will, when it polls a host future,
304/// configure ambient state such that the `Accessor` that a future closes over
305/// will work and be able to access the store.
306///
307/// This has a number of implications for users such as:
308///
309/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
310/// the lifetime of a single future.
311/// * A futures is expected to, however, close over an `Accessor` and keep it
312/// alive probably for the duration of the entire future.
313/// * Different host futures will be given different `Accessor`s, and that's
314/// intentional.
315/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
316/// alleviates some otherwise required bounds to be written down.
317///
318/// # Using `Accessor` in `Drop`
319///
320/// The methods on `Accessor` are only expected to work in the context of
321/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
322/// host future can be dropped at any time throughout the system and Wasmtime
323/// store context is not necessarily available at that time. It's recommended to
324/// not use `Accessor` methods in anything connected to a `Drop` implementation
325/// as they will panic and have unintended results. If you run into this though
326/// feel free to file an issue on the Wasmtime repository.
327pub struct Accessor<T: 'static, D = HasSelf<T>>
328where
329 D: HasData + ?Sized,
330{
331 token: StoreToken<T>,
332 get_data: fn(&mut T) -> D::Data<'_>,
333 instance: Option<Instance>,
334}
335
336/// A helper trait to take any type of accessor-with-data in functions.
337///
338/// This trait is similar to [`AsContextMut`] except that it's used when
339/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
340/// [`Accessor`] is the main type used in concurrent settings and is passed to
341/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
342///
343/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
344/// this trait. This effectively means that regardless of the `D` in
345/// `Accessor<T, D>` it can still be passed to a function which just needs a
346/// store accessor.
347///
348/// Acquiring an [`Accessor`] can be done through [`Instance::run_concurrent`]
349/// for example or in a host function through
350/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
351pub trait AsAccessor {
352 /// The `T` in `Store<T>` that this accessor refers to.
353 type Data: 'static;
354
355 /// The `D` in `Accessor<T, D>`, or the projection out of
356 /// `Self::Data`.
357 type AccessorData: HasData + ?Sized;
358
359 /// Returns the accessor that this is referring to.
360 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
361}
362
363impl<T: AsAccessor + ?Sized> AsAccessor for &T {
364 type Data = T::Data;
365 type AccessorData = T::AccessorData;
366
367 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
368 T::as_accessor(self)
369 }
370}
371
372impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
373 type Data = T;
374 type AccessorData = D;
375
376 fn as_accessor(&self) -> &Accessor<T, D> {
377 self
378 }
379}
380
381// Note that it is intentional at this time that `Accessor` does not actually
382// store `&mut T` or anything similar. This distinctly enables the `Accessor`
383// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
384// that matter). This is used to ergonomically simplify bindings where the
385// majority of the time `Accessor` is closed over in a future which then needs
386// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
387// you already have to write `T: 'static`...) it helps to avoid this.
388//
389// Note as well that `Accessor` doesn't actually store its data at all. Instead
390// it's more of a "proof" of what can be accessed from TLS. API design around
391// `Accessor` and functions like `Linker::func_wrap_concurrent` are
392// intentionally made to ensure that `Accessor` is ideally only used in the
393// context that TLS variables are actually set. For example host functions are
394// given `&Accessor`, not `Accessor`, and this prevents them from persisting
395// the value outside of a future. Within the future the TLS variables are all
396// guaranteed to be set while the future is being polled.
397//
398// Finally though this is not an ironclad guarantee, but nor does it need to be.
399// The TLS APIs are designed to panic or otherwise model usage where they're
400// called recursively or similar. It's hoped that code cannot be constructed to
401// actually hit this at runtime but this is not a safety requirement at this
402// time.
403const _: () = {
404 const fn assert<T: Send + Sync>() {}
405 assert::<Accessor<UnsafeCell<u32>>>();
406};
407
408impl<T> Accessor<T> {
409 /// Creates a new `Accessor` backed by the specified functions.
410 ///
411 /// - `get`: used to retrieve the store
412 ///
413 /// - `get_data`: used to "project" from the store's associated data to
414 /// another type (e.g. a field of that data or a wrapper around it).
415 ///
416 /// - `spawn`: used to queue spawned background tasks to be run later
417 ///
418 /// - `instance`: used to access the `Instance` to which this `Accessor`
419 /// (and the future which closes over it) belongs
420 pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
421 Self {
422 token,
423 get_data: |x| x,
424 instance,
425 }
426 }
427}
428
429impl<T, D> Accessor<T, D>
430where
431 D: HasData + ?Sized,
432{
433 /// Run the specified closure, passing it mutable access to the store.
434 ///
435 /// This function is one of the main building blocks of the [`Accessor`]
436 /// type. This yields synchronous, blocking, access to store via an
437 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
438 /// providing the ability to access `D` via [`Access::get`]. Note that the
439 /// `fun` here is given only temporary access to the store and `T`/`D`
440 /// meaning that the return value `R` here is not allowed to capture borrows
441 /// into the two. If access is needed to data within `T` or `D` outside of
442 /// this closure then it must be `clone`d out, for example.
443 ///
444 /// # Panics
445 ///
446 /// This function will panic if it is call recursively with any other
447 /// accessor already in scope. For example if `with` is called within `fun`,
448 /// then this function will panic. It is up to the embedder to ensure that
449 /// this does not happen.
450 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
451 tls::get(|vmstore| {
452 fun(Access {
453 store: self.token.as_context_mut(vmstore),
454 accessor: self,
455 })
456 })
457 }
458
459 /// Changes this accessor to access `D2` instead of the current type
460 /// parameter `D`.
461 ///
462 /// This changes the underlying data access from `T` to `D2::Data<'_>`.
463 ///
464 /// Note that this is not a public or recommended API because it's easy to
465 /// cause panics with this by having two `Accessor` values live at the same
466 /// time. The returned `Accessor` does not refer to this `Accessor` meaning
467 /// that both can be used. You could, for example, call `Accessor::with`
468 /// simultaneously on both. That would cause a panic though.
469 ///
470 /// In short while there's nothing unsafe about this it's a footgun. It's
471 /// here for bindings generation where the provided accessor is transformed
472 /// into a new accessor and then this returned accessor is passed to
473 /// implementations.
474 ///
475 /// Note that one possible fix for this would be a lifetime parameter on
476 /// `Accessor` itself so the returned value could borrow from the original
477 /// value (or this could be `self`-by-value instead of `&mut self`) but in
478 /// attempting that it was found to be a bit too onerous in terms of
479 /// plumbing things around without a whole lot of benefit.
480 ///
481 /// In short, this works, but must be treated with care. The current main
482 /// user, bindings generation, treats this with care.
483 #[doc(hidden)]
484 pub fn with_data<D2: HasData>(&self, get_data: fn(&mut T) -> D2::Data<'_>) -> Accessor<T, D2> {
485 Accessor {
486 token: self.token,
487 get_data,
488 instance: self.instance,
489 }
490 }
491
492 /// Spawn a background task which will receive an `&Accessor<T, D>` and
493 /// run concurrently with any other tasks in progress for the current
494 /// instance.
495 ///
496 /// This is particularly useful for host functions which return a `stream`
497 /// or `future` such that the code to write to the write end of that
498 /// `stream` or `future` must run after the function returns.
499 ///
500 /// The returned [`AbortHandle`] may be used to cancel the task.
501 ///
502 /// # Panics
503 ///
504 /// Panics if called within a closure provided to the [`Accessor::with`]
505 /// function. This can only be called outside an active invocation of
506 /// [`Accessor::with`].
507 pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
508 where
509 T: 'static,
510 {
511 let instance = self.instance.unwrap();
512 let accessor = self.clone_for_spawn();
513 self.with(|mut access| {
514 instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
515 })
516 }
517
518 /// Retrieve the component instance of the caller.
519 pub fn instance(&self) -> Instance {
520 self.instance.unwrap()
521 }
522
523 fn clone_for_spawn(&self) -> Self {
524 Self {
525 token: self.token,
526 get_data: self.get_data,
527 instance: self.instance,
528 }
529 }
530}
531
532/// Represents a task which may be provided to `Accessor::spawn`,
533/// `Accessor::forward`, or `Instance::spawn`.
534// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
535// option.
536//
537// `AsyncFnOnce` is still nightly-only in latest stable Rust version as of this
538// writing (1.84.1), and even with 1.85.0-beta it's not possible to specify
539// e.g. `Send` and `Sync` bounds on the `Future` type returned by an
540// `AsyncFnOnce`. Also, using `F: Future<Output = Result<()>> + Send + Sync,
541// FN: FnOnce(&Accessor<T>) -> F + Send + Sync + 'static` fails with a type
542// mismatch error when we try to pass it an async closure (e.g. `async move |_|
543// { ... }`). So this seems to be the best we can do for the time being.
544pub trait AccessorTask<T, D, R>: Send + 'static
545where
546 D: HasData + ?Sized,
547{
548 /// Run the task.
549 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
550}
551
552/// Represents the state of a waitable handle.
553#[derive(Debug)]
554enum WaitableState {
555 /// Represents a host task handle.
556 HostTask,
557 /// Represents a guest task handle.
558 GuestTask,
559 /// Represents a stream handle.
560 Stream(TypeStreamTableIndex, StreamFutureState),
561 /// Represents a future handle.
562 Future(TypeFutureTableIndex, StreamFutureState),
563 /// Represents a waitable-set handle.
564 Set,
565}
566
567/// Represents parameter and result metadata for the caller side of a
568/// guest->guest call orchestrated by a fused adapter.
569enum CallerInfo {
570 /// Metadata for a call to an async-lowered import
571 Async {
572 params: Vec<ValRaw>,
573 has_result: bool,
574 },
575 /// Metadata for a call to an sync-lowered import
576 Sync {
577 params: Vec<ValRaw>,
578 result_count: u32,
579 },
580}
581
582/// Indicates how a guest task is waiting on a waitable set.
583enum WaitMode {
584 /// The guest task is waiting using `task.wait`
585 Fiber(StoreFiber<'static>),
586 /// The guest task is waiting via a callback declared as part of an
587 /// async-lifted export.
588 Callback(RuntimeComponentInstanceIndex),
589}
590
591/// Represents the reason a fiber is suspending itself.
592#[derive(Debug)]
593enum SuspendReason {
594 /// The fiber is waiting for an event to be delivered to the specified
595 /// waitable set or task.
596 Waiting {
597 set: TableId<WaitableSet>,
598 task: TableId<GuestTask>,
599 },
600 /// The fiber has finished handling its most recent work item and is waiting
601 /// for another (or to be dropped if it is no longer needed).
602 NeedWork,
603 /// The fiber is yielding and should be resumed once other tasks have had a
604 /// chance to run.
605 Yielding { task: TableId<GuestTask> },
606}
607
608/// Represents a pending call into guest code for a given guest task.
609enum GuestCallKind {
610 /// Indicates there's an event to deliver to the task, possibly related to a
611 /// waitable set the task has been waiting on or polling.
612 DeliverEvent {
613 /// The (sub-)component instance in which the task has most recently
614 /// been executing.
615 ///
616 /// Note that this might not be the same as the instance the guest task
617 /// started executing in given that one or more synchronous guest->guest
618 /// calls may have occurred involving multiple instances.
619 instance: RuntimeComponentInstanceIndex,
620 /// The waitable set the event belongs to, if any.
621 ///
622 /// If this is `None` the event will be waiting in the
623 /// `GuestTask::event` field for the task.
624 set: Option<TableId<WaitableSet>>,
625 },
626 /// Indicates that a new guest task call is pending and may be executed
627 /// using the specified closure.
628 Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
629}
630
631impl fmt::Debug for GuestCallKind {
632 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
633 match self {
634 Self::DeliverEvent { instance, set } => f
635 .debug_struct("DeliverEvent")
636 .field("instance", instance)
637 .field("set", set)
638 .finish(),
639 Self::Start(_) => f.debug_tuple("Start").finish(),
640 }
641 }
642}
643
644/// Represents a pending call into guest code for a given guest task.
645#[derive(Debug)]
646struct GuestCall {
647 task: TableId<GuestTask>,
648 kind: GuestCallKind,
649}
650
651impl GuestCall {
652 /// Returns whether or not the call is ready to run.
653 ///
654 /// A call will not be ready to run if either:
655 ///
656 /// - the (sub-)component instance to be called has already been entered and
657 /// cannot be reentered until an in-progress call completes
658 ///
659 /// - the call is for a not-yet started task and the (sub-)component
660 /// instance to be called has backpressure enabled
661 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
662 let task_instance = state.get(self.task)?.instance;
663 let state = state.instance_state(task_instance);
664 let ready = match &self.kind {
665 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
666 GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure),
667 };
668 log::trace!(
669 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
670 state.do_not_enter,
671 state.backpressure
672 );
673 Ok(ready)
674 }
675}
676
677/// Job to be run on a worker fiber.
678enum WorkerItem {
679 GuestCall(GuestCall),
680 Function(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
681}
682
683/// Represents state related to an in-progress poll operation (e.g. `task.poll`
684/// or `CallbackCode.POLL`).
685#[derive(Debug)]
686struct PollParams {
687 /// Identifies the polling task.
688 task: TableId<GuestTask>,
689 /// The waitable set being polled.
690 set: TableId<WaitableSet>,
691 /// The (sub-)component instance in which the task has most recently been
692 /// executing.
693 ///
694 /// Note that this might not be the same as the instance the guest task
695 /// started executing in given that one or more synchronous guest->guest
696 /// calls may have occurred involving multiple instances.
697 instance: RuntimeComponentInstanceIndex,
698}
699
700/// Represents a pending work item to be handled by the event loop for a given
701/// component instance.
702enum WorkItem {
703 /// A host task to be pushed to `ConcurrentState::futures`.
704 PushFuture(Mutex<HostTaskFuture>),
705 /// A fiber to resume.
706 ResumeFiber(StoreFiber<'static>),
707 /// A pending call into guest code for a given guest task.
708 GuestCall(GuestCall),
709 /// A pending `task.poll` or `CallbackCode.POLL` operation.
710 Poll(PollParams),
711 /// A job to run on a worker fiber.
712 WorkerFunction(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
713}
714
715impl fmt::Debug for WorkItem {
716 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
717 match self {
718 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
719 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
720 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
721 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
722 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
723 }
724 }
725}
726
727impl ConcurrentState {
728 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
729 self.instance_states.entry(instance).or_default()
730 }
731
732 fn push<V: Send + Sync + 'static>(&mut self, value: V) -> Result<TableId<V>, TableError> {
733 self.table.push(value)
734 }
735
736 fn get<V: 'static>(&self, id: TableId<V>) -> Result<&V, TableError> {
737 self.table.get(id)
738 }
739
740 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, TableError> {
741 self.table.get_mut(id)
742 }
743
744 pub fn add_child<T, U>(
745 &mut self,
746 child: TableId<T>,
747 parent: TableId<U>,
748 ) -> Result<(), TableError> {
749 self.table.add_child(child, parent)
750 }
751
752 pub fn remove_child<T, U>(
753 &mut self,
754 child: TableId<T>,
755 parent: TableId<U>,
756 ) -> Result<(), TableError> {
757 self.table.remove_child(child, parent)
758 }
759
760 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, TableError> {
761 self.table.delete(id)
762 }
763
764 fn push_future(&mut self, future: HostTaskFuture) {
765 // Note that we can't directly push to `ConcurrentState::futures` here
766 // since this may be called from a future that's being polled inside
767 // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
768 // so it has exclusive access while polling it. Therefore, we push a
769 // work item to the "high priority" queue, which will actually push to
770 // `ConcurrentState::futures` later.
771 self.push_high_priority(WorkItem::PushFuture(Mutex::new(future)));
772 }
773
774 fn push_high_priority(&mut self, item: WorkItem) {
775 log::trace!("push high priority: {item:?}");
776 self.high_priority.push(item);
777 }
778
779 fn push_low_priority(&mut self, item: WorkItem) {
780 log::trace!("push low priority: {item:?}");
781 self.low_priority.push(item);
782 }
783
784 /// Determine whether the instance associated with the specified guest task
785 /// may be entered (i.e. is not already on the async call stack).
786 ///
787 /// This is an additional check on top of the "may_enter" instance flag;
788 /// it's needed because async-lifted exports with callback functions must
789 /// not call their own instances directly or indirectly, and due to the
790 /// "stackless" nature of callback-enabled guest tasks this may happen even
791 /// if there are no activation records on the stack (i.e. the "may_enter"
792 /// field is `true`) for that instance.
793 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
794 let guest_instance = self.get(guest_task).unwrap().instance;
795
796 // Walk the task tree back to the root, looking for potential
797 // reentrance.
798 //
799 // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
800 // such that each bit represents and instance which has been entered by
801 // that task or an ancestor of that task, in which case this would be a
802 // constant time check.
803 loop {
804 match &self.get_mut(guest_task).unwrap().caller {
805 Caller::Host { .. } => break true,
806 Caller::Guest { task, instance } => {
807 if *instance == guest_instance {
808 break false;
809 } else {
810 guest_task = *task;
811 }
812 }
813 }
814 }
815 }
816
817 /// Handle the `CallbackCode` returned from an async-lifted export or its
818 /// callback.
819 ///
820 /// If `initial_call` is `true`, then the code was received from the
821 /// async-lifted export; otherwise, it was received from its callback.
822 fn handle_callback_code(
823 &mut self,
824 guest_task: TableId<GuestTask>,
825 runtime_instance: RuntimeComponentInstanceIndex,
826 code: u32,
827 initial_call: bool,
828 ) -> Result<()> {
829 let (code, set) = unpack_callback_code(code);
830
831 log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
832
833 let task = self.get_mut(guest_task)?;
834
835 if task.lift_result.is_some() {
836 if code == callback_code::EXIT {
837 return Err(anyhow!(crate::Trap::NoAsyncResult));
838 }
839 if initial_call {
840 // Notify any current or future waiters that this subtask has
841 // started.
842 Waitable::Guest(guest_task).set_event(
843 self,
844 Some(Event::Subtask {
845 status: Status::Started,
846 }),
847 )?;
848 }
849 }
850
851 let get_set = |instance: &mut Self, handle| {
852 if handle == 0 {
853 bail!("invalid waitable-set handle");
854 }
855
856 let (set, WaitableState::Set) =
857 instance.waitable_tables[runtime_instance].get_mut_by_index(handle)?
858 else {
859 bail!("invalid waitable-set handle");
860 };
861
862 Ok(TableId::<WaitableSet>::new(set))
863 };
864
865 match code {
866 callback_code::EXIT => {
867 let task = self.get_mut(guest_task)?;
868 match &task.caller {
869 Caller::Host {
870 remove_task_automatically,
871 ..
872 } => {
873 if *remove_task_automatically {
874 log::trace!("handle_callback_code will delete task {guest_task:?}");
875 Waitable::Guest(guest_task).delete_from(self)?;
876 }
877 }
878 Caller::Guest { .. } => {
879 task.exited = true;
880 task.callback = None;
881 }
882 }
883 }
884 callback_code::YIELD => {
885 // Push this task onto the "low priority" queue so it runs after
886 // any other tasks have had a chance to run.
887 let task = self.get_mut(guest_task)?;
888 assert!(task.event.is_none());
889 task.event = Some(Event::None);
890 self.push_low_priority(WorkItem::GuestCall(GuestCall {
891 task: guest_task,
892 kind: GuestCallKind::DeliverEvent {
893 instance: runtime_instance,
894 set: None,
895 },
896 }));
897 }
898 callback_code::WAIT | callback_code::POLL => {
899 let set = get_set(self, set)?;
900
901 if self.get_mut(guest_task)?.event.is_some() || !self.get_mut(set)?.ready.is_empty()
902 {
903 // An event is immediately available; deliver it ASAP.
904 self.push_high_priority(WorkItem::GuestCall(GuestCall {
905 task: guest_task,
906 kind: GuestCallKind::DeliverEvent {
907 instance: runtime_instance,
908 set: Some(set),
909 },
910 }));
911 } else {
912 // No event is immediately available.
913 match code {
914 callback_code::POLL => {
915 // We're polling, so just yield and check whether an
916 // event has arrived after that.
917 self.push_low_priority(WorkItem::Poll(PollParams {
918 task: guest_task,
919 instance: runtime_instance,
920 set,
921 }));
922 }
923 callback_code::WAIT => {
924 // We're waiting, so register to be woken up when an
925 // event is published for this waitable set.
926 //
927 // Here we also set `GuestTask::wake_on_cancel`
928 // which allows `subtask.cancel` to interrupt the
929 // wait.
930 let old = self.get_mut(guest_task)?.wake_on_cancel.replace(set);
931 assert!(old.is_none());
932 let old = self
933 .get_mut(set)?
934 .waiting
935 .insert(guest_task, WaitMode::Callback(runtime_instance));
936 assert!(old.is_none());
937 }
938 _ => unreachable!(),
939 }
940 }
941 }
942 _ => bail!("unsupported callback code: {code}"),
943 }
944
945 Ok(())
946 }
947
948 /// Record that we're about to enter a (sub-)component instance which does
949 /// not support more than one concurrent, stackful activation, meaning it
950 /// cannot be entered again until the next call returns.
951 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
952 self.instance_state(instance).do_not_enter = true;
953 }
954
955 /// Record that we've exited a (sub-)component instance previously entered
956 /// with `Self::enter_instance` and then calls `Self::partition_pending`.
957 /// See the documentation for the latter for details.
958 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
959 self.instance_state(instance).do_not_enter = false;
960 self.partition_pending(instance)
961 }
962
963 /// Iterate over `InstanceState::pending`, moving any ready items into the
964 /// "high priority" work item queue.
965 ///
966 /// See `GuestCall::is_ready` for details.
967 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
968 for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
969 let call = GuestCall { task, kind };
970 if call.is_ready(self)? {
971 self.push_high_priority(WorkItem::GuestCall(call));
972 } else {
973 self.instance_state(instance)
974 .pending
975 .insert(call.task, call.kind);
976 }
977 }
978
979 Ok(())
980 }
981
982 /// Get the next pending event for the specified task and (optional)
983 /// waitable set, along with the waitable handle if applicable.
984 fn get_event(
985 &mut self,
986 guest_task: TableId<GuestTask>,
987 instance: RuntimeComponentInstanceIndex,
988 set: Option<TableId<WaitableSet>>,
989 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
990 Ok(
991 if let Some(event) = self.get_mut(guest_task)?.event.take() {
992 log::trace!("deliver event {event:?} to {guest_task:?}");
993
994 Some((event, None))
995 } else if let Some((set, waitable)) = set
996 .and_then(|set| {
997 self.get_mut(set)
998 .map(|v| v.ready.pop_first().map(|v| (set, v)))
999 .transpose()
1000 })
1001 .transpose()?
1002 {
1003 let event = waitable.common(self)?.event.take().unwrap();
1004
1005 log::trace!(
1006 "deliver event {event:?} to {guest_task:?} for {waitable:?}; set {set:?}"
1007 );
1008
1009 let entry = self.waitable_tables[instance].get_mut_by_rep(waitable.rep());
1010 let Some((
1011 handle,
1012 WaitableState::HostTask
1013 | WaitableState::GuestTask
1014 | WaitableState::Stream(..)
1015 | WaitableState::Future(..),
1016 )) = entry
1017 else {
1018 bail!("handle not found for waitable rep {waitable:?} instance {instance:?}");
1019 };
1020
1021 waitable.on_delivery(self, event);
1022
1023 Some((event, Some((waitable, handle))))
1024 } else {
1025 None
1026 },
1027 )
1028 }
1029
1030 /// Implements the `backpressure.set` intrinsic.
1031 pub(crate) fn backpressure_set(
1032 &mut self,
1033 caller_instance: RuntimeComponentInstanceIndex,
1034 enabled: u32,
1035 ) -> Result<()> {
1036 let state = self.instance_state(caller_instance);
1037 let old = state.backpressure;
1038 let new = enabled != 0;
1039 state.backpressure = new;
1040
1041 if old && !new {
1042 // Backpressure was previously enabled and is now disabled; move any
1043 // newly-eligible guest calls to the "high priority" queue.
1044 self.partition_pending(caller_instance)?;
1045 }
1046
1047 Ok(())
1048 }
1049
1050 /// Implements the `waitable-set.new` intrinsic.
1051 pub(crate) fn waitable_set_new(
1052 &mut self,
1053 caller_instance: RuntimeComponentInstanceIndex,
1054 ) -> Result<u32> {
1055 let set = self.push(WaitableSet::default())?;
1056 let handle = self.waitable_tables[caller_instance].insert(set.rep(), WaitableState::Set)?;
1057 log::trace!("new waitable set {set:?} (handle {handle})");
1058 Ok(handle)
1059 }
1060
1061 /// Implements the `waitable-set.drop` intrinsic.
1062 pub(crate) fn waitable_set_drop(
1063 &mut self,
1064 caller_instance: RuntimeComponentInstanceIndex,
1065 set: u32,
1066 ) -> Result<()> {
1067 let (rep, WaitableState::Set) =
1068 self.waitable_tables[caller_instance].remove_by_index(set)?
1069 else {
1070 bail!("invalid waitable-set handle");
1071 };
1072
1073 log::trace!("drop waitable set {rep} (handle {set})");
1074
1075 let set = self.delete(TableId::<WaitableSet>::new(rep))?;
1076
1077 if !set.waiting.is_empty() {
1078 bail!("cannot drop waitable set with waiters");
1079 }
1080
1081 Ok(())
1082 }
1083
1084 /// Implements the `waitable.join` intrinsic.
1085 pub(crate) fn waitable_join(
1086 &mut self,
1087 caller_instance: RuntimeComponentInstanceIndex,
1088 waitable_handle: u32,
1089 set_handle: u32,
1090 ) -> Result<()> {
1091 let waitable = Waitable::from_instance(self, caller_instance, waitable_handle)?;
1092
1093 let set = if set_handle == 0 {
1094 None
1095 } else {
1096 let (set, WaitableState::Set) =
1097 self.waitable_tables[caller_instance].get_mut_by_index(set_handle)?
1098 else {
1099 bail!("invalid waitable-set handle");
1100 };
1101
1102 Some(TableId::<WaitableSet>::new(set))
1103 };
1104
1105 log::trace!(
1106 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
1107 );
1108
1109 waitable.join(self, set)
1110 }
1111
1112 /// Implements the `subtask.drop` intrinsic.
1113 pub(crate) fn subtask_drop(
1114 &mut self,
1115 caller_instance: RuntimeComponentInstanceIndex,
1116 task_id: u32,
1117 ) -> Result<()> {
1118 self.waitable_join(caller_instance, task_id, 0)?;
1119
1120 let (rep, state) = self.waitable_tables[caller_instance].remove_by_index(task_id)?;
1121
1122 let (waitable, expected_caller_instance, delete) = match state {
1123 WaitableState::HostTask => {
1124 let id = TableId::<HostTask>::new(rep);
1125 let task = self.get(id)?;
1126 if task.abort_handle.is_some() {
1127 bail!("cannot drop a subtask which has not yet resolved");
1128 }
1129 (Waitable::Host(id), task.caller_instance, true)
1130 }
1131 WaitableState::GuestTask => {
1132 let id = TableId::<GuestTask>::new(rep);
1133 let task = self.get(id)?;
1134 if task.lift_result.is_some() {
1135 bail!("cannot drop a subtask which has not yet resolved");
1136 }
1137 if let Caller::Guest { instance, .. } = &task.caller {
1138 (Waitable::Guest(id), *instance, task.exited)
1139 } else {
1140 unreachable!()
1141 }
1142 }
1143 _ => bail!("invalid task handle: {task_id}"),
1144 };
1145
1146 if waitable.take_event(self)?.is_some() {
1147 bail!("cannot drop a subtask with an undelivered event");
1148 }
1149
1150 if delete {
1151 waitable.delete_from(self)?;
1152 }
1153
1154 // Since waitables can neither be passed between instances nor forged,
1155 // this should never fail unless there's a bug in Wasmtime, but we check
1156 // here to be sure:
1157 assert_eq!(expected_caller_instance, caller_instance);
1158 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
1159 Ok(())
1160 }
1161
1162 /// Implements the `context.get` intrinsic.
1163 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
1164 let task = self.guest_task.unwrap();
1165 let val = self.get(task)?.context[usize::try_from(slot).unwrap()];
1166 log::trace!("context_get {task:?} slot {slot} val {val:#x}");
1167 Ok(val)
1168 }
1169
1170 /// Implements the `context.set` intrinsic.
1171 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
1172 let task = self.guest_task.unwrap();
1173 log::trace!("context_set {task:?} slot {slot} val {val:#x}");
1174 self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
1175 Ok(())
1176 }
1177
1178 fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
1179 &self.component.env_component().options[options]
1180 }
1181}
1182
1183impl Instance {
1184 /// Enable or disable concurrent state debugging mode for e.g. integration
1185 /// tests.
1186 ///
1187 /// This will avoid re-using deleted handles, making it easier to catch
1188 /// e.g. "use-after-delete" and "double-delete" errors. It can also make
1189 /// reading trace output easier since it ensures handles are never
1190 /// repurposed.
1191 #[doc(hidden)]
1192 pub fn enable_concurrent_state_debug(&self, mut store: impl AsContextMut, enable: bool) {
1193 self.id()
1194 .get_mut(store.as_context_mut().0)
1195 .concurrent_state_mut()
1196 .table
1197 .enable_debug(enable);
1198 // TODO: do the same for the tables holding guest-facing handles
1199 }
1200
1201 /// Assert that all the relevant tables and queues in the concurrent state
1202 /// for this instance are empty.
1203 ///
1204 /// This is for sanity checking in integration tests
1205 /// (e.g. `component-async-tests`) that the relevant state has been cleared
1206 /// after each test concludes. This should help us catch leaks, e.g. guest
1207 /// tasks which haven't been deleted despite having completed and having
1208 /// been dropped by their supertasks.
1209 #[doc(hidden)]
1210 pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
1211 let state = self
1212 .id()
1213 .get_mut(store.as_context_mut().0)
1214 .concurrent_state_mut();
1215 assert!(state.table.is_empty(), "non-empty table: {:?}", state.table);
1216 assert!(state.high_priority.is_empty());
1217 assert!(state.low_priority.is_empty());
1218 assert!(state.guest_task.is_none());
1219 assert!(
1220 state
1221 .futures
1222 .get_mut()
1223 .unwrap()
1224 .as_ref()
1225 .unwrap()
1226 .is_empty()
1227 );
1228 assert!(
1229 state
1230 .waitable_tables
1231 .iter()
1232 .all(|(_, table)| table.is_empty())
1233 );
1234 assert!(
1235 state
1236 .instance_states
1237 .iter()
1238 .all(|(_, state)| state.pending.is_empty())
1239 );
1240 assert!(
1241 state
1242 .error_context_tables
1243 .iter()
1244 .all(|(_, table)| table.is_empty())
1245 );
1246 assert!(state.global_error_context_ref_counts.is_empty());
1247 }
1248
1249 /// Run the specified closure `fun` to completion as part of this instance's
1250 /// event loop.
1251 ///
1252 /// Like [`Self::run`], this will run `fun` as part of this instance's event
1253 /// loop until it yields a result _or_ there are no more tasks to run.
1254 /// Unlike [`Self::run`], `fun` is provided an [`Accessor`], which provides
1255 /// controlled access to the `Store` and its data.
1256 ///
1257 /// This function can be used to invoke [`Func::call_concurrent`] for
1258 /// example within the async closure provided here.
1259 ///
1260 /// # Example
1261 ///
1262 /// ```
1263 /// # use {
1264 /// # anyhow::{Result},
1265 /// # wasmtime::{
1266 /// # component::{ Component, Linker, Resource, ResourceTable},
1267 /// # Config, Engine, Store
1268 /// # },
1269 /// # };
1270 /// #
1271 /// # struct MyResource(u32);
1272 /// # struct Ctx { table: ResourceTable }
1273 /// #
1274 /// # async fn foo() -> Result<()> {
1275 /// # let mut config = Config::new();
1276 /// # let engine = Engine::new(&config)?;
1277 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1278 /// # let mut linker = Linker::new(&engine);
1279 /// # let component = Component::new(&engine, "")?;
1280 /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1281 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1282 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1283 /// instance.run_concurrent(&mut store, async |accessor| -> wasmtime::Result<_> {
1284 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1285 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1286 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1287 /// bar.call_concurrent(accessor, (value.0,)).await?;
1288 /// Ok(())
1289 /// }).await??;
1290 /// # Ok(())
1291 /// # }
1292 /// ```
1293 pub async fn run_concurrent<T, R>(
1294 self,
1295 mut store: impl AsContextMut<Data = T>,
1296 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1297 ) -> Result<R>
1298 where
1299 T: 'static,
1300 {
1301 check_recursive_run();
1302 let mut store = store.as_context_mut();
1303 let token = StoreToken::new(store.as_context_mut());
1304
1305 struct Dropper<'a, T: 'static, V> {
1306 store: StoreContextMut<'a, T>,
1307 value: ManuallyDrop<V>,
1308 }
1309
1310 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1311 fn drop(&mut self) {
1312 tls::set(self.store.0.traitobj_mut(), || {
1313 // SAFETY: Here we drop the value without moving it for the
1314 // first and only time -- per the contract for `Drop::drop`,
1315 // this code won't run again, and the `value` field will no
1316 // longer be accessible.
1317 unsafe { ManuallyDrop::drop(&mut self.value) }
1318 });
1319 }
1320 }
1321
1322 let accessor = &Accessor::new(token, Some(self));
1323 let dropper = &mut Dropper {
1324 store,
1325 value: ManuallyDrop::new(fun(accessor)),
1326 };
1327 // SAFETY: We never move `dropper` nor its `value` field.
1328 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1329
1330 self.poll_until(dropper.store.as_context_mut(), future)
1331 .await
1332 }
1333
1334 /// Spawn a background task to run as part of this instance's event loop.
1335 ///
1336 /// The task will receive an `&Accessor<U>` and run concurrently with
1337 /// any other tasks in progress for the instance.
1338 ///
1339 /// Note that the task will only make progress if and when the event loop
1340 /// for this instance is run.
1341 ///
1342 /// The returned [`SpawnHandle`] may be used to cancel the task.
1343 pub fn spawn<U: 'static>(
1344 self,
1345 mut store: impl AsContextMut<Data = U>,
1346 task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1347 ) -> AbortHandle {
1348 let mut store = store.as_context_mut();
1349 let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
1350 self.spawn_with_accessor(store, accessor, task)
1351 }
1352
1353 /// Internal implementation of `spawn` functions where a `store` is
1354 /// available along with an `Accessor`.
1355 fn spawn_with_accessor<T, D>(
1356 self,
1357 mut store: StoreContextMut<T>,
1358 accessor: Accessor<T, D>,
1359 task: impl AccessorTask<T, D, Result<()>>,
1360 ) -> AbortHandle
1361 where
1362 T: 'static,
1363 D: HasData + ?Sized,
1364 {
1365 let store = store.as_context_mut();
1366
1367 // Create an "abortable future" here where internally the future will
1368 // hook calls to poll and possibly spawn more background tasks on each
1369 // iteration.
1370 let (handle, future) =
1371 AbortHandle::run(async move { HostTaskOutput::Result(task.run(&accessor).await) });
1372 self.concurrent_state_mut(store.0)
1373 .push_future(Box::pin(async move {
1374 future.await.unwrap_or(HostTaskOutput::Result(Ok(())))
1375 }));
1376
1377 handle
1378 }
1379
1380 /// Run this instance's event loop.
1381 ///
1382 /// The returned future will resolve when either the specified future
1383 /// completes (in which case we return its result) or no further progress
1384 /// can be made (in which case we trap with `Trap::AsyncDeadlock`).
1385 async fn poll_until<T, R>(
1386 self,
1387 store: StoreContextMut<'_, T>,
1388 mut future: Pin<&mut impl Future<Output = R>>,
1389 ) -> Result<R> {
1390 loop {
1391 // Take `ConcurrentState::futures` out of the instance so we can
1392 // poll it while also safely giving any of the futures inside access
1393 // to `self`.
1394 let mut futures = self
1395 .concurrent_state_mut(store.0)
1396 .futures
1397 .get_mut()
1398 .unwrap()
1399 .take()
1400 .unwrap();
1401 let mut next = pin!(futures.next());
1402
1403 let result = future::poll_fn(|cx| {
1404 // First, poll the future we were passed as an argument and
1405 // return immediately if it's ready.
1406 if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1407 return Poll::Ready(Ok(Either::Left(value)));
1408 }
1409
1410 // Next, poll `ConcurrentState::futures` (which includes any
1411 // pending host tasks and/or background tasks), returning
1412 // immediately if one of them fails.
1413 let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1414 Poll::Ready(Some(output)) => {
1415 match output {
1416 HostTaskOutput::Result(Err(e)) => return Poll::Ready(Err(e)),
1417 HostTaskOutput::Result(Ok(())) => {}
1418 HostTaskOutput::Function(fun) => {
1419 // Defer calling this function to a worker fiber
1420 // in case it involves calling a guest realloc
1421 // function as part of a lowering operation.
1422 //
1423 // TODO: This isn't necessary for _all_
1424 // `HostOutput::Function`s, so we could optimize
1425 // by adding another variant to `HostOutput` to
1426 // distinguish which ones need it and which
1427 // don't.
1428 self.concurrent_state_mut(store.0)
1429 .push_high_priority(WorkItem::WorkerFunction(Mutex::new(fun)))
1430 }
1431 }
1432 Poll::Ready(true)
1433 }
1434 Poll::Ready(None) => Poll::Ready(false),
1435 Poll::Pending => Poll::Pending,
1436 };
1437
1438 let mut instance = self.id().get_mut(store.0);
1439
1440 // Next, check the "high priority" work queue and return
1441 // immediately if it has at least one item.
1442 let state = instance.as_mut().concurrent_state_mut();
1443 let ready = mem::take(&mut state.high_priority);
1444 let ready = if ready.is_empty() {
1445 // Next, check the "low priority" work queue and return
1446 // immediately if it has at least one item.
1447 let ready = mem::take(&mut state.low_priority);
1448 if ready.is_empty() {
1449 return match next {
1450 Poll::Ready(true) => {
1451 // In this case, one of the futures in
1452 // `ConcurrentState::futures` completed
1453 // successfully, so we return now and continue
1454 // the outer loop in case there is another one
1455 // ready to complete.
1456 Poll::Ready(Ok(Either::Right(Vec::new())))
1457 }
1458 Poll::Ready(false) => {
1459 // Poll the future we were passed one last time
1460 // in case one of `ConcurrentState::futures` had
1461 // the side effect of unblocking it.
1462 if let Poll::Ready(value) =
1463 self.set_tls(store.0, || future.as_mut().poll(cx))
1464 {
1465 Poll::Ready(Ok(Either::Left(value)))
1466 } else {
1467 // In this case, there are no more pending
1468 // futures in `ConcurrentState::futures`,
1469 // there are no remaining work items, _and_
1470 // the future we were passed as an argument
1471 // still hasn't completed, meaning we're
1472 // stuck, so we return an error. The
1473 // underlying assumption is that `future`
1474 // depends on this component instance making
1475 // such progress, and thus there's no point
1476 // in continuing to poll it given we've run
1477 // out of work to do.
1478 //
1479 // Note that we'd also reach this point if
1480 // the host embedder passed e.g. a
1481 // `std::future::Pending` to
1482 // `Instance::run_concurrent`, in which case
1483 // we'd return a "deadlock" error even when
1484 // any and all tasks have completed
1485 // normally. However, that's not how
1486 // `Instance::run_concurrent` is intended
1487 // (and documented) to be used, so it seems
1488 // reasonable to lump that case in with
1489 // "real" deadlocks.
1490 //
1491 // TODO: Once we've added host APIs for
1492 // cancelling in-progress tasks, we can
1493 // return some other, non-error value here,
1494 // treating it as "normal" and giving the
1495 // host embedder a chance to intervene by
1496 // cancelling one or more tasks and/or
1497 // starting new tasks capable of waking the
1498 // existing ones.
1499 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1500 }
1501 }
1502 // There is at least one pending future in
1503 // `ConcurrentState::futures` and we have nothing
1504 // else to do but wait for now, so we return
1505 // `Pending`.
1506 Poll::Pending => Poll::Pending,
1507 };
1508 } else {
1509 ready
1510 }
1511 } else {
1512 ready
1513 };
1514
1515 Poll::Ready(Ok(Either::Right(ready)))
1516 })
1517 .await;
1518
1519 // Put the `ConcurrentState::futures` back into the instance before
1520 // we return or handle any work items since one or more of those
1521 // items might append more futures.
1522 *self
1523 .concurrent_state_mut(store.0)
1524 .futures
1525 .get_mut()
1526 .unwrap() = Some(futures);
1527
1528 match result? {
1529 // The future we were passed as an argument completed, so we
1530 // return the result.
1531 Either::Left(value) => break Ok(value),
1532 // The future we were passed has not yet completed, so handle
1533 // any work items and then loop again.
1534 Either::Right(ready) => {
1535 for item in ready {
1536 self.handle_work_item(store.0.traitobj_mut(), item).await?;
1537 }
1538 }
1539 }
1540 }
1541 }
1542
1543 /// Handle the specified work item, possibly resuming a fiber if applicable.
1544 async fn handle_work_item(self, store: &mut StoreOpaque, item: WorkItem) -> Result<()> {
1545 log::trace!("handle work item {item:?}");
1546 match item {
1547 WorkItem::PushFuture(future) => {
1548 self.concurrent_state_mut(store)
1549 .futures
1550 .get_mut()
1551 .unwrap()
1552 .as_mut()
1553 .unwrap()
1554 .push(future.into_inner().unwrap());
1555 }
1556 WorkItem::ResumeFiber(fiber) => {
1557 self.resume_fiber(store, fiber).await?;
1558 }
1559 WorkItem::GuestCall(call) => {
1560 let state = self.concurrent_state_mut(store);
1561 if call.is_ready(state)? {
1562 self.run_on_worker(store, WorkerItem::GuestCall(call))
1563 .await?;
1564 } else {
1565 let task = state.get_mut(call.task)?;
1566 if !task.starting_sent {
1567 task.starting_sent = true;
1568 if let GuestCallKind::Start(_) = &call.kind {
1569 Waitable::Guest(call.task).set_event(
1570 state,
1571 Some(Event::Subtask {
1572 status: Status::Starting,
1573 }),
1574 )?;
1575 }
1576 }
1577
1578 let runtime_instance = state.get(call.task)?.instance;
1579 state
1580 .instance_state(runtime_instance)
1581 .pending
1582 .insert(call.task, call.kind);
1583 }
1584 }
1585 WorkItem::Poll(params) => {
1586 let state = self.concurrent_state_mut(store);
1587 if state.get_mut(params.task)?.event.is_some()
1588 || !state.get_mut(params.set)?.ready.is_empty()
1589 {
1590 // There's at least one event immediately available; deliver
1591 // it to the guest ASAP.
1592 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1593 task: params.task,
1594 kind: GuestCallKind::DeliverEvent {
1595 instance: params.instance,
1596 set: Some(params.set),
1597 },
1598 }));
1599 } else {
1600 // There are no events immediately available; deliver
1601 // `Event::None` to the guest.
1602 state.get_mut(params.task)?.event = Some(Event::None);
1603 state.push_high_priority(WorkItem::GuestCall(GuestCall {
1604 task: params.task,
1605 kind: GuestCallKind::DeliverEvent {
1606 instance: params.instance,
1607 set: Some(params.set),
1608 },
1609 }));
1610 }
1611 }
1612 WorkItem::WorkerFunction(fun) => {
1613 self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1614 }
1615 }
1616
1617 Ok(())
1618 }
1619
1620 /// Resume the specified fiber, giving it exclusive access to the specified
1621 /// store.
1622 async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
1623 let old_task = self.concurrent_state_mut(store).guest_task;
1624 log::trace!("resume_fiber: save current task {old_task:?}");
1625
1626 let fiber = fiber::resolve_or_release(store, fiber).await?;
1627
1628 let state = self.concurrent_state_mut(store);
1629
1630 state.guest_task = old_task;
1631 log::trace!("resume_fiber: restore current task {old_task:?}");
1632
1633 if let Some(mut fiber) = fiber {
1634 // See the `SuspendReason` documentation for what each case means.
1635 match state.suspend_reason.take().unwrap() {
1636 SuspendReason::NeedWork => {
1637 if state.worker.is_none() {
1638 state.worker = Some(fiber);
1639 } else {
1640 fiber.dispose(store);
1641 }
1642 }
1643 SuspendReason::Yielding { .. } => {
1644 state.push_low_priority(WorkItem::ResumeFiber(fiber));
1645 }
1646 SuspendReason::Waiting { set, task } => {
1647 let old = state
1648 .get_mut(set)?
1649 .waiting
1650 .insert(task, WaitMode::Fiber(fiber));
1651 assert!(old.is_none());
1652 }
1653 }
1654 }
1655
1656 Ok(())
1657 }
1658
1659 /// Execute the specified guest call on a worker fiber.
1660 async fn run_on_worker(self, store: &mut StoreOpaque, item: WorkerItem) -> Result<()> {
1661 let worker = if let Some(fiber) = self.concurrent_state_mut(store).worker.take() {
1662 fiber
1663 } else {
1664 fiber::make_fiber(store.traitobj_mut(), move |store| {
1665 loop {
1666 match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1667 WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1668 WorkerItem::Function(fun) => fun.into_inner().unwrap()(store, self)?,
1669 }
1670
1671 self.suspend(store, SuspendReason::NeedWork)?;
1672 }
1673 })?
1674 };
1675
1676 let worker_item = &mut self.concurrent_state_mut(store).worker_item;
1677 assert!(worker_item.is_none());
1678 *worker_item = Some(item);
1679
1680 self.resume_fiber(store, worker).await
1681 }
1682
1683 /// Execute the specified guest call.
1684 fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
1685 match call.kind {
1686 GuestCallKind::DeliverEvent {
1687 instance: runtime_instance,
1688 set,
1689 } => {
1690 let state = self.concurrent_state_mut(store);
1691 let (event, waitable) = state.get_event(call.task, runtime_instance, set)?.unwrap();
1692 let task = state.get_mut(call.task)?;
1693 let runtime_instance = task.instance;
1694 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
1695
1696 log::trace!(
1697 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
1698 call.task,
1699 );
1700
1701 let old_task = state.guest_task.replace(call.task);
1702 log::trace!(
1703 "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
1704 call.task
1705 );
1706
1707 self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
1708
1709 let state = self.concurrent_state_mut(store);
1710 state.enter_instance(runtime_instance);
1711
1712 let callback = state.get_mut(call.task)?.callback.take().unwrap();
1713
1714 let code = callback(store, self, runtime_instance, event, handle)?;
1715
1716 let state = self.concurrent_state_mut(store);
1717
1718 state.get_mut(call.task)?.callback = Some(callback);
1719
1720 state.exit_instance(runtime_instance)?;
1721
1722 self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
1723
1724 let state = self.concurrent_state_mut(store);
1725 state.handle_callback_code(call.task, runtime_instance, code, false)?;
1726
1727 state.guest_task = old_task;
1728 log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
1729 }
1730 GuestCallKind::Start(fun) => {
1731 fun(store, self)?;
1732 }
1733 }
1734
1735 Ok(())
1736 }
1737
1738 /// Suspend the current fiber, storing the reason in
1739 /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1740 /// it should be resumed.
1741 ///
1742 /// See the `SuspendReason` documentation for details.
1743 fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
1744 log::trace!("suspend fiber: {reason:?}");
1745
1746 // If we're yielding or waiting on behalf of a guest task, we'll need to
1747 // pop the call context which manages resource borrows before suspending
1748 // and then push it again once we've resumed.
1749 let task = match &reason {
1750 SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1751 SuspendReason::NeedWork => None,
1752 };
1753
1754 let old_guest_task = if let Some(task) = task {
1755 self.maybe_pop_call_context(store.store_opaque_mut(), task)?;
1756 self.concurrent_state_mut(store).guest_task
1757 } else {
1758 None
1759 };
1760
1761 let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
1762 assert!(suspend_reason.is_none());
1763 *suspend_reason = Some(reason);
1764
1765 store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1766
1767 if let Some(task) = task {
1768 self.concurrent_state_mut(store).guest_task = old_guest_task;
1769 self.maybe_push_call_context(store.store_opaque_mut(), task)?;
1770 }
1771
1772 Ok(())
1773 }
1774
1775 /// Push the call context for managing resource borrows for the specified
1776 /// guest task if it has not yet either returned a result or cancelled
1777 /// itself.
1778 fn maybe_push_call_context(
1779 self,
1780 store: &mut StoreOpaque,
1781 guest_task: TableId<GuestTask>,
1782 ) -> Result<()> {
1783 let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
1784 if task.lift_result.is_some() {
1785 log::trace!("push call context for {guest_task:?}");
1786 let call_context = task.call_context.take().unwrap();
1787 store.component_resource_state().0.push(call_context);
1788 }
1789 Ok(())
1790 }
1791
1792 /// Pop the call context for managing resource borrows for the specified
1793 /// guest task if it has not yet either returned a result or cancelled
1794 /// itself.
1795 fn maybe_pop_call_context(
1796 self,
1797 store: &mut StoreOpaque,
1798 guest_task: TableId<GuestTask>,
1799 ) -> Result<()> {
1800 if self
1801 .concurrent_state_mut(store)
1802 .get(guest_task)?
1803 .lift_result
1804 .is_some()
1805 {
1806 log::trace!("pop call context for {guest_task:?}");
1807 let call_context = Some(store.component_resource_state().0.pop().unwrap());
1808 self.concurrent_state_mut(store)
1809 .get_mut(guest_task)?
1810 .call_context = call_context;
1811 }
1812 Ok(())
1813 }
1814
1815 /// Add the specified guest call to the "high priority" work item queue, to
1816 /// be started as soon as backpressure and/or reentrance rules allow.
1817 ///
1818 /// SAFETY: The raw pointer arguments must be valid references to guest
1819 /// functions (with the appropriate signatures) when the closures queued by
1820 /// this function are called.
1821 unsafe fn queue_call<T: 'static>(
1822 self,
1823 mut store: StoreContextMut<T>,
1824 guest_task: TableId<GuestTask>,
1825 callee: SendSyncPtr<VMFuncRef>,
1826 param_count: usize,
1827 result_count: usize,
1828 flags: Option<InstanceFlags>,
1829 async_: bool,
1830 callback: Option<SendSyncPtr<VMFuncRef>>,
1831 post_return: Option<SendSyncPtr<VMFuncRef>>,
1832 ) -> Result<()> {
1833 /// Return a closure which will call the specified function in the scope
1834 /// of the specified task.
1835 ///
1836 /// This will use `GuestTask::lower_params` to lower the parameters, but
1837 /// will not lift the result; instead, it returns a
1838 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1839 /// any, may be lifted. Note that an async-lifted export will have
1840 /// returned its result using the `task.return` intrinsic (or not
1841 /// returned a result at all, in the case of `task.cancel`), in which
1842 /// case the "result" of this call will either be a callback code or
1843 /// nothing.
1844 ///
1845 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1846 /// the returned closure is called.
1847 unsafe fn make_call<T: 'static>(
1848 store: StoreContextMut<T>,
1849 guest_task: TableId<GuestTask>,
1850 callee: SendSyncPtr<VMFuncRef>,
1851 param_count: usize,
1852 result_count: usize,
1853 flags: Option<InstanceFlags>,
1854 ) -> impl FnOnce(
1855 &mut dyn VMStore,
1856 Instance,
1857 ) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1858 + Send
1859 + Sync
1860 + 'static
1861 + use<T> {
1862 let token = StoreToken::new(store);
1863 move |store: &mut dyn VMStore, instance: Instance| {
1864 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1865 let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1866 let may_enter_after_call = task.call_post_return_automatically();
1867 let lower = task.lower_params.take().unwrap();
1868
1869 lower(store, instance, &mut storage[..param_count])?;
1870
1871 let mut store = token.as_context_mut(store);
1872
1873 // SAFETY: Per the contract documented in `make_call's`
1874 // documentation, `callee` must be a valid pointer.
1875 unsafe {
1876 if let Some(mut flags) = flags {
1877 flags.set_may_enter(false);
1878 }
1879 crate::Func::call_unchecked_raw(
1880 &mut store,
1881 callee.as_non_null(),
1882 NonNull::new(
1883 &mut storage[..param_count.max(result_count)]
1884 as *mut [MaybeUninit<ValRaw>] as _,
1885 )
1886 .unwrap(),
1887 )?;
1888 if let Some(mut flags) = flags {
1889 flags.set_may_enter(may_enter_after_call);
1890 }
1891 }
1892
1893 Ok(storage)
1894 }
1895 }
1896
1897 // SAFETY: Per the contract described in this function documentation,
1898 // the `callee` pointer which `call` closes over must be valid when
1899 // called by the closure we queue below.
1900 let call = unsafe {
1901 make_call(
1902 store.as_context_mut(),
1903 guest_task,
1904 callee,
1905 param_count,
1906 result_count,
1907 flags,
1908 )
1909 };
1910
1911 let callee_instance = self.concurrent_state_mut(store.0).get(guest_task)?.instance;
1912 let fun = if callback.is_some() {
1913 assert!(async_);
1914
1915 Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1916 let old_task = instance
1917 .concurrent_state_mut(store)
1918 .guest_task
1919 .replace(guest_task);
1920 log::trace!(
1921 "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1922 );
1923
1924 instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1925
1926 instance
1927 .concurrent_state_mut(store)
1928 .enter_instance(callee_instance);
1929
1930 // SAFETY: See the documentation for `make_call` to review the
1931 // contract we must uphold for `call` here.
1932 //
1933 // Per the contract described in the `queue_call`
1934 // documentation, the `callee` pointer which `call` closes
1935 // over must be valid.
1936 let storage = call(store, instance)?;
1937
1938 instance
1939 .concurrent_state_mut(store)
1940 .exit_instance(callee_instance)?;
1941
1942 instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1943
1944 let state = instance.concurrent_state_mut(store);
1945 state.guest_task = old_task;
1946 log::trace!("stackless call: restored {old_task:?} as current task");
1947
1948 // SAFETY: `wasmparser` will have validated that the callback
1949 // function returns a `i32` result.
1950 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1951
1952 state.handle_callback_code(guest_task, callee_instance, code, true)?;
1953
1954 Ok(())
1955 })
1956 as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
1957 } else {
1958 let token = StoreToken::new(store.as_context_mut());
1959 Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1960 let old_task = instance
1961 .concurrent_state_mut(store)
1962 .guest_task
1963 .replace(guest_task);
1964 log::trace!(
1965 "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1966 );
1967
1968 let mut flags = instance.id().get(store).instance_flags(callee_instance);
1969
1970 instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1971
1972 // Unless this is a callback-less (i.e. stackful)
1973 // async-lifted export, we need to record that the instance
1974 // cannot be entered until the call returns.
1975 if !async_ {
1976 instance
1977 .concurrent_state_mut(store)
1978 .enter_instance(callee_instance);
1979 }
1980
1981 // SAFETY: See the documentation for `make_call` to review the
1982 // contract we must uphold for `call` here.
1983 //
1984 // Per the contract described in the `queue_call`
1985 // documentation, the `callee` pointer which `call` closes
1986 // over must be valid.
1987 let storage = call(store, instance)?;
1988
1989 if async_ {
1990 // This is a callback-less (i.e. stackful) async-lifted
1991 // export, so there is no post-return function, and
1992 // either `task.return` or `task.cancel` should have
1993 // been called.
1994 if instance
1995 .concurrent_state_mut(store)
1996 .get(guest_task)?
1997 .lift_result
1998 .is_some()
1999 {
2000 return Err(anyhow!(crate::Trap::NoAsyncResult));
2001 }
2002 } else {
2003 // This is a sync-lifted export, so now is when we lift the
2004 // result, optionally call the post-return function, if any,
2005 // and finally notify any current or future waiters that the
2006 // subtask has returned.
2007
2008 let lift = {
2009 let state = instance.concurrent_state_mut(store);
2010 state.exit_instance(callee_instance)?;
2011
2012 assert!(state.get(guest_task)?.result.is_none());
2013
2014 state.get_mut(guest_task)?.lift_result.take().unwrap()
2015 };
2016
2017 // SAFETY: `result_count` represents the number of core Wasm
2018 // results returned, per `wasmparser`.
2019 let result = (lift.lift)(store, instance, unsafe {
2020 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2021 &storage[..result_count],
2022 )
2023 })?;
2024
2025 let post_return_arg = match result_count {
2026 0 => ValRaw::i32(0),
2027 // SAFETY: `result_count` represents the number of
2028 // core Wasm results returned, per `wasmparser`.
2029 1 => unsafe { storage[0].assume_init() },
2030 _ => unreachable!(),
2031 };
2032
2033 if instance
2034 .concurrent_state_mut(store)
2035 .get(guest_task)?
2036 .call_post_return_automatically()
2037 {
2038 unsafe { flags.set_needs_post_return(false) }
2039
2040 if let Some(func) = post_return {
2041 let mut store = token.as_context_mut(store);
2042
2043 // SAFETY: `func` is a valid `*mut VMFuncRef` from
2044 // either `wasmtime-cranelift`-generated fused adapter
2045 // code or `component::Options`. Per `wasmparser`
2046 // post-return signature validation, we know it takes a
2047 // single parameter.
2048 unsafe {
2049 crate::Func::call_unchecked_raw(
2050 &mut store,
2051 func.as_non_null(),
2052 slice::from_ref(&post_return_arg).into(),
2053 )?;
2054 }
2055 }
2056
2057 unsafe { flags.set_may_enter(true) }
2058 }
2059
2060 instance.task_complete(
2061 store,
2062 guest_task,
2063 result,
2064 Status::Returned,
2065 post_return_arg,
2066 )?;
2067 }
2068
2069 instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
2070
2071 let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
2072
2073 match &task.caller {
2074 Caller::Host {
2075 remove_task_automatically,
2076 ..
2077 } => {
2078 if *remove_task_automatically {
2079 Waitable::Guest(guest_task)
2080 .delete_from(instance.concurrent_state_mut(store))?;
2081 }
2082 }
2083 Caller::Guest { .. } => {
2084 task.exited = true;
2085 }
2086 }
2087
2088 Ok(())
2089 })
2090 };
2091
2092 self.concurrent_state_mut(store.0)
2093 .push_high_priority(WorkItem::GuestCall(GuestCall {
2094 task: guest_task,
2095 kind: GuestCallKind::Start(fun),
2096 }));
2097
2098 Ok(())
2099 }
2100
2101 /// Prepare (but do not start) a guest->guest call.
2102 ///
2103 /// This is called from fused adapter code generated in
2104 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_`
2105 /// are synthesized Wasm functions which move the parameters from the caller
2106 /// to the callee and the result from the callee to the caller,
2107 /// respectively. The adapter will call `Self::start_call` immediately
2108 /// after calling this function.
2109 ///
2110 /// SAFETY: All the pointer arguments must be valid pointers to guest
2111 /// entities (and with the expected signatures for the function references
2112 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2113 unsafe fn prepare_call<T: 'static>(
2114 self,
2115 mut store: StoreContextMut<T>,
2116 start: *mut VMFuncRef,
2117 return_: *mut VMFuncRef,
2118 caller_instance: RuntimeComponentInstanceIndex,
2119 callee_instance: RuntimeComponentInstanceIndex,
2120 task_return_type: TypeTupleIndex,
2121 memory: *mut VMMemoryDefinition,
2122 string_encoding: u8,
2123 caller_info: CallerInfo,
2124 ) -> Result<()> {
2125 enum ResultInfo {
2126 Heap { results: u32 },
2127 Stack { result_count: u32 },
2128 }
2129
2130 let result_info = match &caller_info {
2131 CallerInfo::Async {
2132 has_result: true,
2133 params,
2134 } => ResultInfo::Heap {
2135 results: params.last().unwrap().get_u32(),
2136 },
2137 CallerInfo::Async {
2138 has_result: false, ..
2139 } => ResultInfo::Stack { result_count: 0 },
2140 CallerInfo::Sync {
2141 result_count,
2142 params,
2143 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2144 results: params.last().unwrap().get_u32(),
2145 },
2146 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2147 result_count: *result_count,
2148 },
2149 };
2150
2151 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2152
2153 // Create a new guest task for the call, closing over the `start` and
2154 // `return_` functions to lift the parameters and lower the result,
2155 // respectively.
2156 let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2157 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2158 let token = StoreToken::new(store.as_context_mut());
2159 let state = self.concurrent_state_mut(store.0);
2160 let old_task = state.guest_task.take();
2161 let new_task = GuestTask::new(
2162 state,
2163 Box::new(move |store, instance, dst| {
2164 let mut store = token.as_context_mut(store);
2165 assert!(dst.len() <= MAX_FLAT_PARAMS);
2166 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2167 let count = match caller_info {
2168 // Async callers, if they have a result, use the last
2169 // parameter as a return pointer so chop that off if
2170 // relevant here.
2171 CallerInfo::Async { params, has_result } => {
2172 let params = ¶ms[..params.len() - usize::from(has_result)];
2173 for (param, src) in params.iter().zip(&mut src) {
2174 src.write(*param);
2175 }
2176 params.len()
2177 }
2178
2179 // Sync callers forward everything directly.
2180 CallerInfo::Sync { params, .. } => {
2181 for (param, src) in params.iter().zip(&mut src) {
2182 src.write(*param);
2183 }
2184 params.len()
2185 }
2186 };
2187 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2188 // `wasmtime-cranelift`-generated fused adapter code. Based on
2189 // how it was constructed (see
2190 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2191 // for details) we know it takes count parameters and returns
2192 // `dst.len()` results.
2193 unsafe {
2194 crate::Func::call_unchecked_raw(
2195 &mut store,
2196 start.as_non_null(),
2197 NonNull::new(
2198 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2199 )
2200 .unwrap(),
2201 )?;
2202 }
2203 dst.copy_from_slice(&src[..dst.len()]);
2204 let state = instance.concurrent_state_mut(store.0);
2205 let task = state.guest_task.unwrap();
2206 Waitable::Guest(task).set_event(
2207 state,
2208 Some(Event::Subtask {
2209 status: Status::Started,
2210 }),
2211 )?;
2212 Ok(())
2213 }),
2214 LiftResult {
2215 lift: Box::new(move |store, instance, src| {
2216 // SAFETY: See comment in closure passed as `lower_params`
2217 // parameter above.
2218 let mut store = token.as_context_mut(store);
2219 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2220 if let ResultInfo::Heap { results } = &result_info {
2221 my_src.push(ValRaw::u32(*results));
2222 }
2223 // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2224 // `wasmtime-cranelift`-generated fused adapter code. Based
2225 // on how it was constructed (see
2226 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2227 // for details) we know it takes `src.len()` parameters and
2228 // returns up to 1 result.
2229 unsafe {
2230 crate::Func::call_unchecked_raw(
2231 &mut store,
2232 return_.as_non_null(),
2233 my_src.as_mut_slice().into(),
2234 )?;
2235 }
2236 let state = instance.concurrent_state_mut(store.0);
2237 let task = state.guest_task.unwrap();
2238 if sync_caller {
2239 state.get_mut(task)?.sync_result =
2240 Some(if let ResultInfo::Stack { result_count } = &result_info {
2241 match result_count {
2242 0 => None,
2243 1 => Some(my_src[0]),
2244 _ => unreachable!(),
2245 }
2246 } else {
2247 None
2248 });
2249 }
2250 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2251 }),
2252 ty: task_return_type,
2253 memory: NonNull::new(memory).map(SendSyncPtr::new),
2254 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2255 },
2256 Caller::Guest {
2257 task: old_task.unwrap(),
2258 instance: caller_instance,
2259 },
2260 None,
2261 callee_instance,
2262 )?;
2263
2264 let guest_task = state.push(new_task)?;
2265
2266 if let Some(old_task) = old_task {
2267 if !state.may_enter(guest_task) {
2268 bail!(crate::Trap::CannotEnterComponent);
2269 }
2270
2271 state.get_mut(old_task)?.subtasks.insert(guest_task);
2272 };
2273
2274 // Make the new task the current one so that `Self::start_call` knows
2275 // which one to start.
2276 state.guest_task = Some(guest_task);
2277 log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2278
2279 Ok(())
2280 }
2281
2282 /// Call the specified callback function for an async-lifted export.
2283 ///
2284 /// SAFETY: `function` must be a valid reference to a guest function of the
2285 /// correct signature for a callback.
2286 unsafe fn call_callback<T>(
2287 self,
2288 mut store: StoreContextMut<T>,
2289 callee_instance: RuntimeComponentInstanceIndex,
2290 function: SendSyncPtr<VMFuncRef>,
2291 event: Event,
2292 handle: u32,
2293 may_enter_after_call: bool,
2294 ) -> Result<u32> {
2295 let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2296
2297 let (ordinal, result) = event.parts();
2298 let params = &mut [
2299 ValRaw::u32(ordinal),
2300 ValRaw::u32(handle),
2301 ValRaw::u32(result),
2302 ];
2303 // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2304 // `wasmtime-cranelift`-generated fused adapter code or
2305 // `component::Options`. Per `wasmparser` callback signature
2306 // validation, we know it takes three parameters and returns one.
2307 unsafe {
2308 flags.set_may_enter(false);
2309 crate::Func::call_unchecked_raw(
2310 &mut store,
2311 function.as_non_null(),
2312 params.as_mut_slice().into(),
2313 )?;
2314 flags.set_may_enter(may_enter_after_call);
2315 }
2316 Ok(params[0].get_u32())
2317 }
2318
2319 /// Start a guest->guest call previously prepared using
2320 /// `Self::prepare_call`.
2321 ///
2322 /// This is called from fused adapter code generated in
2323 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call
2324 /// this function immediately after calling `Self::prepare_call`.
2325 ///
2326 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2327 /// functions with the appropriate signatures for the current guest task.
2328 /// If this is a call to an async-lowered import, the actual call may be
2329 /// deferred and run after this function returns, in which case the pointer
2330 /// arguments must also be valid when the call happens.
2331 unsafe fn start_call<T: 'static>(
2332 self,
2333 mut store: StoreContextMut<T>,
2334 callback: *mut VMFuncRef,
2335 post_return: *mut VMFuncRef,
2336 callee: *mut VMFuncRef,
2337 param_count: u32,
2338 result_count: u32,
2339 flags: u32,
2340 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2341 ) -> Result<u32> {
2342 let token = StoreToken::new(store.as_context_mut());
2343 let async_caller = storage.is_none();
2344 let state = self.concurrent_state_mut(store.0);
2345 let guest_task = state.guest_task.unwrap();
2346 let may_enter_after_call = state.get(guest_task)?.call_post_return_automatically();
2347 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2348 let param_count = usize::try_from(param_count).unwrap();
2349 assert!(param_count <= MAX_FLAT_PARAMS);
2350 let result_count = usize::try_from(result_count).unwrap();
2351 assert!(result_count <= MAX_FLAT_RESULTS);
2352
2353 let task = state.get_mut(guest_task)?;
2354 if !callback.is_null() {
2355 // We're calling an async-lifted export with a callback, so store
2356 // the callback and related context as part of the task so we can
2357 // call it later when needed.
2358 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2359 task.callback = Some(Box::new(
2360 move |store, instance, runtime_instance, event, handle| {
2361 let store = token.as_context_mut(store);
2362 unsafe {
2363 instance.call_callback::<T>(
2364 store,
2365 runtime_instance,
2366 callback,
2367 event,
2368 handle,
2369 may_enter_after_call,
2370 )
2371 }
2372 },
2373 ));
2374 }
2375
2376 let Caller::Guest {
2377 task: caller,
2378 instance: runtime_instance,
2379 } = &task.caller
2380 else {
2381 // As of this writing, `start_call` is only used for guest->guest
2382 // calls.
2383 unreachable!()
2384 };
2385 let caller = *caller;
2386 let caller_instance = *runtime_instance;
2387
2388 let callee_instance = task.instance;
2389
2390 let instance_flags = if callback.is_null() {
2391 None
2392 } else {
2393 Some(self.id().get(store.0).instance_flags(callee_instance))
2394 };
2395
2396 // Queue the call as a "high priority" work item.
2397 unsafe {
2398 self.queue_call(
2399 store.as_context_mut(),
2400 guest_task,
2401 callee,
2402 param_count,
2403 result_count,
2404 instance_flags,
2405 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2406 NonNull::new(callback).map(SendSyncPtr::new),
2407 NonNull::new(post_return).map(SendSyncPtr::new),
2408 )?;
2409 }
2410
2411 let state = self.concurrent_state_mut(store.0);
2412
2413 // Use the caller's `GuestTask::sync_call_set` to register interest in
2414 // the subtask...
2415 let guest_waitable = Waitable::Guest(guest_task);
2416 let old_set = guest_waitable.common(state)?.set;
2417 let set = state.get_mut(caller)?.sync_call_set;
2418 guest_waitable.join(state, Some(set))?;
2419
2420 // ... and suspend this fiber temporarily while we wait for it to start.
2421 //
2422 // Note that we _could_ call the callee directly using the current fiber
2423 // rather than suspend this one, but that would make reasoning about the
2424 // event loop more complicated and is probably only worth doing if
2425 // there's a measurable performance benefit. In addition, it would mean
2426 // blocking the caller if the callee calls a blocking sync-lowered
2427 // import, and as of this writing the spec says we must not do that.
2428 //
2429 // Alternatively, the fused adapter code could be modified to call the
2430 // callee directly without calling a host-provided intrinsic at all (in
2431 // which case it would need to do its own, inline backpressure checks,
2432 // etc.). Again, we'd want to see a measurable performance benefit
2433 // before committing to such an optimization. And again, we'd need to
2434 // update the spec to allow that.
2435 let (status, waitable) = loop {
2436 self.suspend(
2437 store.0.traitobj_mut(),
2438 SuspendReason::Waiting { set, task: caller },
2439 )?;
2440
2441 let state = self.concurrent_state_mut(store.0);
2442
2443 let event = Waitable::Guest(guest_task).take_event(state)?;
2444 let Some(Event::Subtask { status }) = event else {
2445 unreachable!();
2446 };
2447
2448 log::trace!("status {status:?} for {guest_task:?}");
2449
2450 if status == Status::Returned {
2451 // It returned, so we can stop waiting.
2452 break (status, None);
2453 } else if async_caller {
2454 // It hasn't returned yet, but the caller is calling via an
2455 // async-lowered import, so we generate a handle for the task
2456 // waitable and return the status.
2457 break (
2458 status,
2459 Some(
2460 state.waitable_tables[caller_instance]
2461 .insert(guest_task.rep(), WaitableState::GuestTask)?,
2462 ),
2463 );
2464 } else {
2465 // The callee hasn't returned yet, and the caller is calling via
2466 // a sync-lowered import, so we loop and keep waiting until the
2467 // callee returns.
2468 }
2469 };
2470
2471 let state = self.concurrent_state_mut(store.0);
2472
2473 guest_waitable.join(state, old_set)?;
2474
2475 if let Some(storage) = storage {
2476 // The caller used a sync-lowered import to call an async-lifted
2477 // export, in which case the result, if any, has been stashed in
2478 // `GuestTask::sync_result`.
2479 if let Some(result) = state.get_mut(guest_task)?.sync_result.take() {
2480 if let Some(result) = result {
2481 storage[0] = MaybeUninit::new(result);
2482 }
2483
2484 Waitable::Guest(guest_task).delete_from(state)?;
2485 } else {
2486 // This means the callee failed to call either `task.return` or
2487 // `task.cancel` before exiting.
2488 return Err(anyhow!(crate::Trap::NoAsyncResult));
2489 }
2490 }
2491
2492 // Reset the current task to point to the caller as it resumes control.
2493 state.guest_task = Some(caller);
2494 log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2495
2496 Ok(status.pack(waitable))
2497 }
2498
2499 /// Wrap the specified host function in a future which will call it, passing
2500 /// it an `&Accessor<T>`.
2501 ///
2502 /// See the `Accessor` documentation for details.
2503 pub(crate) fn wrap_call<T: 'static, F, R>(
2504 self,
2505 store: StoreContextMut<T>,
2506 closure: F,
2507 ) -> impl Future<Output = Result<R>> + 'static
2508 where
2509 T: 'static,
2510 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
2511 + Send
2512 + Sync
2513 + 'static,
2514 R: Send + Sync + 'static,
2515 {
2516 let token = StoreToken::new(store);
2517 async move {
2518 let mut accessor = Accessor::new(token, Some(self));
2519 closure(&mut accessor).await
2520 }
2521 }
2522
2523 /// Poll the specified future once on behalf of a guest->host call using an
2524 /// async-lowered import.
2525 ///
2526 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns
2527 /// `Pending`, add it to the set of futures to be polled as part of this
2528 /// instance's event loop until it completes, and then return
2529 /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2530 ///
2531 /// Whether the future returns `Ready` immediately or later, the `lower`
2532 /// function will be used to lower the result, if any, into the guest caller's
2533 /// stack and linear memory unless the task has been cancelled.
2534 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2535 self,
2536 mut store: StoreContextMut<T>,
2537 future: impl Future<Output = Result<R>> + Send + 'static,
2538 caller_instance: RuntimeComponentInstanceIndex,
2539 lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
2540 ) -> Result<Option<u32>> {
2541 let token = StoreToken::new(store.as_context_mut());
2542 let state = self.concurrent_state_mut(store.0);
2543 let caller = state.guest_task.unwrap();
2544
2545 // Create an abortable future which hooks calls to poll and manages call
2546 // context state for the future.
2547 let (abort_handle, future) = AbortHandle::run(async move {
2548 let mut future = pin!(future);
2549 let mut call_context = None;
2550 future::poll_fn(move |cx| {
2551 // Push the call context for managing any resource borrows
2552 // for the task.
2553 tls::get(|store| {
2554 if let Some(call_context) = call_context.take() {
2555 token
2556 .as_context_mut(store)
2557 .0
2558 .component_resource_state()
2559 .0
2560 .push(call_context);
2561 }
2562 });
2563
2564 let result = future.as_mut().poll(cx);
2565
2566 if result.is_pending() {
2567 // Pop the call context for managing any resource
2568 // borrows for the task.
2569 tls::get(|store| {
2570 call_context = Some(
2571 token
2572 .as_context_mut(store)
2573 .0
2574 .component_resource_state()
2575 .0
2576 .pop()
2577 .unwrap(),
2578 );
2579 });
2580 }
2581 result
2582 })
2583 .await
2584 });
2585
2586 // We create a new host task even though it might complete immediately
2587 // (in which case we won't need to pass a waitable back to the guest).
2588 // If it does complete immediately, we'll remove it before we return.
2589 let task = state.push(HostTask::new(caller_instance, Some(abort_handle)))?;
2590
2591 log::trace!("new host task child of {caller:?}: {task:?}");
2592 let token = StoreToken::new(store.as_context_mut());
2593
2594 // Map the output of the future to a `HostTaskOutput` responsible for
2595 // lowering the result into the guest's stack and memory, as well as
2596 // notifying any waiters that the task returned.
2597 let mut future = Box::pin(async move {
2598 let result = match future.await {
2599 Some(result) => result,
2600 // Task was cancelled; nothing left to do.
2601 None => return HostTaskOutput::Result(Ok(())),
2602 };
2603 HostTaskOutput::Function(Box::new(move |store, instance| {
2604 let mut store = token.as_context_mut(store);
2605 lower(store.as_context_mut(), instance, result?)?;
2606 let state = instance.concurrent_state_mut(store.0);
2607 state.get_mut(task)?.abort_handle.take();
2608 Waitable::Host(task).set_event(
2609 state,
2610 Some(Event::Subtask {
2611 status: Status::Returned,
2612 }),
2613 )?;
2614
2615 Ok(())
2616 }))
2617 });
2618
2619 // Finally, poll the future. We can use a dummy `Waker` here because
2620 // we'll add the future to `ConcurrentState::futures` and poll it
2621 // automatically from the event loop if it doesn't complete immediately
2622 // here.
2623 let poll = self.set_tls(store.0, || {
2624 future
2625 .as_mut()
2626 .poll(&mut Context::from_waker(&Waker::noop()))
2627 });
2628
2629 Ok(match poll {
2630 Poll::Ready(output) => {
2631 // It finished immediately; lower the result and delete the
2632 // task.
2633 output.consume(store.0.traitobj_mut(), self)?;
2634 log::trace!("delete host task {task:?} (already ready)");
2635 self.concurrent_state_mut(store.0).delete(task)?;
2636 None
2637 }
2638 Poll::Pending => {
2639 // It hasn't finished yet; add the future to
2640 // `ConcurrentState::futures` so it will be polled by the event
2641 // loop and allocate a waitable handle to return to the guest.
2642 let state = self.concurrent_state_mut(store.0);
2643 state.push_future(future);
2644 let handle = state.waitable_tables[caller_instance]
2645 .insert(task.rep(), WaitableState::HostTask)?;
2646 log::trace!(
2647 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2648 );
2649 Some(handle)
2650 }
2651 })
2652 }
2653
2654 /// Poll the specified future until it completes on behalf of a guest->host
2655 /// call using a sync-lowered import.
2656 ///
2657 /// This is similar to `Self::first_poll` except it's for sync-lowered
2658 /// imports, meaning we don't need to handle cancellation and we can block
2659 /// the caller until the task completes, at which point the caller can
2660 /// handle lowering the result to the guest's stack and linear memory.
2661 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
2662 self,
2663 store: &mut dyn VMStore,
2664 future: impl Future<Output = Result<R>> + Send + 'static,
2665 caller_instance: RuntimeComponentInstanceIndex,
2666 ) -> Result<R> {
2667 let state = self.concurrent_state_mut(store);
2668
2669 // If there is no current guest task set, that means the host function
2670 // was registered using e.g. `LinkerInstance::func_wrap`, in which case
2671 // it should complete immediately.
2672 let Some(caller) = state.guest_task else {
2673 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
2674 Poll::Ready(result) => result,
2675 Poll::Pending => {
2676 unreachable!()
2677 }
2678 };
2679 };
2680
2681 // Save any existing result stashed in `GuestTask::result` so we can
2682 // replace it with the new result.
2683 let old_result = state
2684 .get_mut(caller)
2685 .with_context(|| format!("bad handle: {caller:?}"))?
2686 .result
2687 .take();
2688
2689 // Add a temporary host task into the table so we can track its
2690 // progress. Note that we'll never allocate a waitable handle for the
2691 // guest since we're being called synchronously.
2692 let task = state.push(HostTask::new(caller_instance, None))?;
2693
2694 log::trace!("new host task child of {caller:?}: {task:?}");
2695
2696 // Map the output of the future to a `HostTaskOutput` which will take
2697 // care of stashing the result in `GuestTask::result` and resuming this
2698 // fiber when the host task completes.
2699 let mut future = Box::pin(future.map(move |result| {
2700 HostTaskOutput::Function(Box::new(move |store, instance| {
2701 let state = instance.concurrent_state_mut(store);
2702 state.get_mut(caller)?.result = Some(Box::new(result?) as _);
2703
2704 Waitable::Host(task).set_event(
2705 state,
2706 Some(Event::Subtask {
2707 status: Status::Returned,
2708 }),
2709 )?;
2710
2711 Ok(())
2712 }))
2713 })) as HostTaskFuture;
2714
2715 // Finally, poll the future. We can use a dummy `Waker` here because
2716 // we'll add the future to `ConcurrentState::futures` and poll it
2717 // automatically from the event loop if it doesn't complete immediately
2718 // here.
2719 let poll = self.set_tls(store, || {
2720 future
2721 .as_mut()
2722 .poll(&mut Context::from_waker(&Waker::noop()))
2723 });
2724
2725 match poll {
2726 Poll::Ready(output) => {
2727 // It completed immediately; run the `HostTaskOutput` function
2728 // to stash the result and delete the task.
2729 output.consume(store, self)?;
2730 log::trace!("delete host task {task:?} (already ready)");
2731 self.concurrent_state_mut(store).delete(task)?;
2732 }
2733 Poll::Pending => {
2734 // It did not complete immediately; add it to
2735 // `ConcurrentState::futures` so it will be polled via the event
2736 // loop, then use `GuestTask::sync_call_set` to wait for the
2737 // task to complete, suspending the current fiber until it does
2738 // so.
2739 let state = self.concurrent_state_mut(store);
2740 state.push_future(future);
2741
2742 let set = state.get_mut(caller)?.sync_call_set;
2743 Waitable::Host(task).join(state, Some(set))?;
2744
2745 self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2746 }
2747 }
2748
2749 // Retrieve and return the result.
2750 Ok(*mem::replace(
2751 &mut self.concurrent_state_mut(store).get_mut(caller)?.result,
2752 old_result,
2753 )
2754 .unwrap()
2755 .downcast()
2756 .unwrap())
2757 }
2758
2759 /// Implements the `task.return` intrinsic, lifting the result for the
2760 /// current guest task.
2761 pub(crate) fn task_return(
2762 self,
2763 store: &mut dyn VMStore,
2764 ty: TypeTupleIndex,
2765 options: OptionsIndex,
2766 storage: &[ValRaw],
2767 ) -> Result<()> {
2768 let state = self.concurrent_state_mut(store);
2769 let CanonicalOptions {
2770 string_encoding,
2771 data_model,
2772 ..
2773 } = *state.options(options);
2774 let guest_task = state.guest_task.unwrap();
2775 let lift = state
2776 .get_mut(guest_task)?
2777 .lift_result
2778 .take()
2779 .ok_or_else(|| {
2780 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2781 })?;
2782 assert!(state.get(guest_task)?.result.is_none());
2783
2784 let invalid = ty != lift.ty
2785 || string_encoding != lift.string_encoding
2786 || match data_model {
2787 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2788 Some(memory) => {
2789 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2790 let actual = self.id().get(store).runtime_memory(memory);
2791 expected != actual
2792 }
2793 // Memory not specified, meaning it didn't need to be
2794 // specified per validation, so not invalid.
2795 None => false,
2796 },
2797 // Always invalid as this isn't supported.
2798 CanonicalOptionsDataModel::Gc { .. } => true,
2799 };
2800
2801 if invalid {
2802 bail!("invalid `task.return` signature and/or options for current task");
2803 }
2804
2805 log::trace!("task.return for {guest_task:?}");
2806
2807 let result = (lift.lift)(store, self, storage)?;
2808
2809 self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2810 }
2811
2812 /// Implements the `task.cancel` intrinsic.
2813 pub(crate) fn task_cancel(
2814 self,
2815 store: &mut dyn VMStore,
2816 _caller_instance: RuntimeComponentInstanceIndex,
2817 ) -> Result<()> {
2818 let state = self.concurrent_state_mut(store);
2819 let guest_task = state.guest_task.unwrap();
2820 let task = state.get_mut(guest_task)?;
2821 if !task.cancel_sent {
2822 bail!("`task.cancel` called by task which has not been cancelled")
2823 }
2824 _ = task.lift_result.take().ok_or_else(|| {
2825 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2826 })?;
2827
2828 assert!(task.result.is_none());
2829
2830 log::trace!("task.cancel for {guest_task:?}");
2831
2832 self.task_complete(
2833 store,
2834 guest_task,
2835 Box::new(DummyResult),
2836 Status::ReturnCancelled,
2837 ValRaw::i32(0),
2838 )
2839 }
2840
2841 /// Complete the specified guest task (i.e. indicate that it has either
2842 /// returned a (possibly empty) result or cancelled itself).
2843 ///
2844 /// This will return any resource borrows and notify any current or future
2845 /// waiters that the task has completed.
2846 fn task_complete(
2847 self,
2848 store: &mut dyn VMStore,
2849 guest_task: TableId<GuestTask>,
2850 result: Box<dyn Any + Send + Sync>,
2851 status: Status,
2852 post_return_arg: ValRaw,
2853 ) -> Result<()> {
2854 if self
2855 .concurrent_state_mut(store)
2856 .get(guest_task)?
2857 .call_post_return_automatically()
2858 {
2859 let (calls, host_table, _, instance) = store
2860 .store_opaque_mut()
2861 .component_resource_state_with_instance(self);
2862 ResourceTables {
2863 calls,
2864 host_table: Some(host_table),
2865 guest: Some(instance.guest_tables()),
2866 }
2867 .exit_call()?;
2868 } else {
2869 // As of this writing, the only scenario where `call_post_return_automatically`
2870 // would be false for a `GuestTask` is for host-to-guest calls using
2871 // `[Typed]Func::call_async`, in which case the `function_index`
2872 // should be a non-`None` value.
2873 let function_index = self
2874 .concurrent_state_mut(store)
2875 .get(guest_task)?
2876 .function_index
2877 .unwrap();
2878
2879 self.id()
2880 .get_mut(store)
2881 .post_return_arg_set(function_index, post_return_arg);
2882 }
2883
2884 let state = self.concurrent_state_mut(store);
2885 let task = state.get_mut(guest_task)?;
2886
2887 if let Caller::Host { tx, .. } = &mut task.caller {
2888 if let Some(tx) = tx.take() {
2889 _ = tx.send(result);
2890 }
2891 } else {
2892 task.result = Some(result);
2893 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2894 }
2895
2896 Ok(())
2897 }
2898
2899 /// Implements the `waitable-set.wait` intrinsic.
2900 pub(crate) fn waitable_set_wait(
2901 self,
2902 store: &mut dyn VMStore,
2903 options: OptionsIndex,
2904 set: u32,
2905 payload: u32,
2906 ) -> Result<u32> {
2907 let state = self.concurrent_state_mut(store);
2908 let opts = state.options(options);
2909 let async_ = opts.async_;
2910 let caller_instance = opts.instance;
2911 let (rep, WaitableState::Set) =
2912 state.waitable_tables[caller_instance].get_mut_by_index(set)?
2913 else {
2914 bail!("invalid waitable-set handle");
2915 };
2916
2917 self.waitable_check(
2918 store,
2919 async_,
2920 WaitableCheck::Wait(WaitableCheckParams {
2921 set: TableId::new(rep),
2922 caller_instance,
2923 options,
2924 payload,
2925 }),
2926 )
2927 }
2928
2929 /// Implements the `waitable-set.poll` intrinsic.
2930 pub(crate) fn waitable_set_poll(
2931 self,
2932 store: &mut dyn VMStore,
2933 options: OptionsIndex,
2934 set: u32,
2935 payload: u32,
2936 ) -> Result<u32> {
2937 let state = self.concurrent_state_mut(store);
2938 let opts = state.options(options);
2939 let async_ = opts.async_;
2940 let caller_instance = opts.instance;
2941 let (rep, WaitableState::Set) =
2942 state.waitable_tables[caller_instance].get_mut_by_index(set)?
2943 else {
2944 bail!("invalid waitable-set handle");
2945 };
2946
2947 self.waitable_check(
2948 store,
2949 async_,
2950 WaitableCheck::Poll(WaitableCheckParams {
2951 set: TableId::new(rep),
2952 caller_instance,
2953 options,
2954 payload,
2955 }),
2956 )
2957 }
2958
2959 /// Implements the `yield` intrinsic.
2960 pub(crate) fn yield_(self, store: &mut dyn VMStore, async_: bool) -> Result<bool> {
2961 self.waitable_check(store, async_, WaitableCheck::Yield)
2962 .map(|_code| {
2963 // TODO: plumb cancellation to here:
2964 // https://github.com/bytecodealliance/wasmtime/issues/11191
2965 false
2966 })
2967 }
2968
2969 /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and
2970 /// `yield` intrinsics.
2971 fn waitable_check(
2972 self,
2973 store: &mut dyn VMStore,
2974 async_: bool,
2975 check: WaitableCheck,
2976 ) -> Result<u32> {
2977 if async_ {
2978 bail!(
2979 "todo: async `waitable-set.wait`, `waitable-set.poll`, and `yield` not yet implemented"
2980 );
2981 }
2982
2983 let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
2984
2985 let (wait, set) = match &check {
2986 WaitableCheck::Wait(params) => (true, Some(params.set)),
2987 WaitableCheck::Poll(params) => (false, Some(params.set)),
2988 WaitableCheck::Yield => (false, None),
2989 };
2990
2991 // First, suspend this fiber, allowing any other tasks to run.
2992 self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
2993
2994 log::trace!("waitable check for {guest_task:?}; set {set:?}");
2995
2996 let state = self.concurrent_state_mut(store);
2997 let task = state.get(guest_task)?;
2998
2999 if wait && task.callback.is_some() {
3000 bail!("cannot call `task.wait` from async-lifted export with callback");
3001 }
3002
3003 // If we're waiting, and there are no events immediately available,
3004 // suspend the fiber until that changes.
3005 if wait {
3006 let set = set.unwrap();
3007
3008 if task.event.is_none() && state.get(set)?.ready.is_empty() {
3009 let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
3010 assert!(old.is_none());
3011
3012 self.suspend(
3013 store,
3014 SuspendReason::Waiting {
3015 set,
3016 task: guest_task,
3017 },
3018 )?;
3019 }
3020 }
3021
3022 log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
3023
3024 let result = match check {
3025 // Deliver any pending events to the guest and return.
3026 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3027 let event = self.concurrent_state_mut(store).get_event(
3028 guest_task,
3029 params.caller_instance,
3030 Some(params.set),
3031 )?;
3032
3033 let (ordinal, handle, result) = if wait {
3034 let (event, waitable) = event.unwrap();
3035 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3036 let (ordinal, result) = event.parts();
3037 (ordinal, handle, result)
3038 } else {
3039 if let Some((event, waitable)) = event {
3040 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3041 let (ordinal, result) = event.parts();
3042 (ordinal, handle, result)
3043 } else {
3044 log::trace!(
3045 "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
3046 params.set
3047 );
3048 let (ordinal, result) = Event::None.parts();
3049 (ordinal, 0, result)
3050 }
3051 };
3052 let store = store.store_opaque_mut();
3053 let options = Options::new_index(store, self, params.options);
3054 let ptr = func::validate_inbounds::<(u32, u32)>(
3055 options.memory_mut(store),
3056 &ValRaw::u32(params.payload),
3057 )?;
3058 options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3059 options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3060 Ok(ordinal)
3061 }
3062 // TODO: Check `GuestTask::event` in case it contains
3063 // `Event::Cancelled`, in which case we'll need to return that to
3064 // the guest:
3065 // https://github.com/bytecodealliance/wasmtime/issues/11191
3066 WaitableCheck::Yield => Ok(0),
3067 };
3068
3069 result
3070 }
3071
3072 /// Implements the `subtask.cancel` intrinsic.
3073 pub(crate) fn subtask_cancel(
3074 self,
3075 store: &mut dyn VMStore,
3076 caller_instance: RuntimeComponentInstanceIndex,
3077 async_: bool,
3078 task_id: u32,
3079 ) -> Result<u32> {
3080 let concurrent_state = self.concurrent_state_mut(store);
3081 let (rep, state) =
3082 concurrent_state.waitable_tables[caller_instance].get_mut_by_index(task_id)?;
3083 let (waitable, expected_caller_instance) = match state {
3084 WaitableState::HostTask => {
3085 let id = TableId::<HostTask>::new(rep);
3086 (
3087 Waitable::Host(id),
3088 concurrent_state.get(id)?.caller_instance,
3089 )
3090 }
3091 WaitableState::GuestTask => {
3092 let id = TableId::<GuestTask>::new(rep);
3093 if let Caller::Guest { instance, .. } = &concurrent_state.get(id)?.caller {
3094 (Waitable::Guest(id), *instance)
3095 } else {
3096 unreachable!()
3097 }
3098 }
3099 _ => bail!("invalid task handle: {task_id}"),
3100 };
3101 // Since waitables can neither be passed between instances nor forged,
3102 // this should never fail unless there's a bug in Wasmtime, but we check
3103 // here to be sure:
3104 assert_eq!(expected_caller_instance, caller_instance);
3105
3106 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3107
3108 if let Waitable::Host(host_task) = waitable {
3109 if let Some(handle) = concurrent_state.get_mut(host_task)?.abort_handle.take() {
3110 handle.abort();
3111 return Ok(Status::ReturnCancelled as u32);
3112 }
3113 } else {
3114 let caller = concurrent_state.guest_task.unwrap();
3115 let guest_task = TableId::<GuestTask>::new(rep);
3116 let task = concurrent_state.get_mut(guest_task)?;
3117 if task.lower_params.is_some() {
3118 task.lower_params = None;
3119 task.lift_result = None;
3120
3121 // Not yet started; cancel and remove from pending
3122 let callee_instance = task.instance;
3123
3124 let kind = concurrent_state
3125 .instance_state(callee_instance)
3126 .pending
3127 .remove(&guest_task);
3128
3129 if kind.is_none() {
3130 bail!("`subtask.cancel` called after terminal status delivered");
3131 }
3132
3133 return Ok(Status::StartCancelled as u32);
3134 } else if task.lift_result.is_some() {
3135 // Started, but not yet returned or cancelled; send the
3136 // `CANCELLED` event
3137 task.cancel_sent = true;
3138 // Note that this might overwrite an event that was set earlier
3139 // (e.g. `Event::None` if the task is yielding, or
3140 // `Event::Cancelled` if it was already cancelled), but that's
3141 // okay -- this should supersede the previous state.
3142 task.event = Some(Event::Cancelled);
3143 if let Some(set) = task.wake_on_cancel.take() {
3144 let item = match concurrent_state
3145 .get_mut(set)?
3146 .waiting
3147 .remove(&guest_task)
3148 .unwrap()
3149 {
3150 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3151 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3152 task: guest_task,
3153 kind: GuestCallKind::DeliverEvent {
3154 instance,
3155 set: None,
3156 },
3157 }),
3158 };
3159 concurrent_state.push_high_priority(item);
3160
3161 self.suspend(store, SuspendReason::Yielding { task: caller })?;
3162 }
3163
3164 let concurrent_state = self.concurrent_state_mut(store);
3165 let task = concurrent_state.get_mut(guest_task)?;
3166 if task.lift_result.is_some() {
3167 // Still not yet returned or cancelled; if `async_`, return
3168 // `BLOCKED`; otherwise wait
3169 if async_ {
3170 return Ok(BLOCKED);
3171 } else {
3172 let waitable = Waitable::Guest(guest_task);
3173 let old_set = waitable.common(concurrent_state)?.set;
3174 let set = concurrent_state.get_mut(caller)?.sync_call_set;
3175 waitable.join(concurrent_state, Some(set))?;
3176
3177 self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
3178
3179 waitable.join(self.concurrent_state_mut(store), old_set)?;
3180 }
3181 }
3182 }
3183 }
3184
3185 let event = waitable.take_event(self.concurrent_state_mut(store))?;
3186 if let Some(Event::Subtask {
3187 status: status @ (Status::Returned | Status::ReturnCancelled),
3188 }) = event
3189 {
3190 Ok(status as u32)
3191 } else {
3192 bail!("`subtask.cancel` called after terminal status delivered");
3193 }
3194 }
3195
3196 /// Configures TLS state so `store` will be available via `tls::get` within
3197 /// the closure `f` provided.
3198 ///
3199 /// This is used to ensure that `Future::poll`, which doesn't take a `store`
3200 /// parameter, is able to get access to the `store` during future poll
3201 /// methods.
3202 fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
3203 struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
3204
3205 impl Drop for Reset<'_> {
3206 fn drop(&mut self) {
3207 self.0.concurrent_async_state_mut().current_instance = self.1;
3208 }
3209 }
3210 let prev = mem::replace(
3211 &mut store.concurrent_async_state_mut().current_instance,
3212 Some(self.id().instance()),
3213 );
3214 let reset = Reset(store, prev);
3215
3216 tls::set(reset.0, f)
3217 }
3218
3219 /// Convenience function to reduce boilerplate.
3220 pub(crate) fn concurrent_state_mut<'a>(
3221 &self,
3222 store: &'a mut StoreOpaque,
3223 ) -> &'a mut ConcurrentState {
3224 self.id().get_mut(store).concurrent_state_mut()
3225 }
3226}
3227
3228/// Trait representing component model ABI async intrinsics and fused adapter
3229/// helper functions.
3230///
3231/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3232/// which must be valid for at least the duration of the call (and possibly for
3233/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3234/// pointers used for async calls).
3235pub trait VMComponentAsyncStore {
3236 /// A helper function for fused adapter modules involving calls where the
3237 /// one of the caller or callee is async.
3238 ///
3239 /// This helper is not used when the caller and callee both use the sync
3240 /// ABI, only when at least one is async is this used.
3241 unsafe fn prepare_call(
3242 &mut self,
3243 instance: Instance,
3244 memory: *mut VMMemoryDefinition,
3245 start: *mut VMFuncRef,
3246 return_: *mut VMFuncRef,
3247 caller_instance: RuntimeComponentInstanceIndex,
3248 callee_instance: RuntimeComponentInstanceIndex,
3249 task_return_type: TypeTupleIndex,
3250 string_encoding: u8,
3251 result_count: u32,
3252 storage: *mut ValRaw,
3253 storage_len: usize,
3254 ) -> Result<()>;
3255
3256 /// A helper function for fused adapter modules involving calls where the
3257 /// caller is sync-lowered but the callee is async-lifted.
3258 unsafe fn sync_start(
3259 &mut self,
3260 instance: Instance,
3261 callback: *mut VMFuncRef,
3262 callee: *mut VMFuncRef,
3263 param_count: u32,
3264 storage: *mut MaybeUninit<ValRaw>,
3265 storage_len: usize,
3266 ) -> Result<()>;
3267
3268 /// A helper function for fused adapter modules involving calls where the
3269 /// caller is async-lowered.
3270 unsafe fn async_start(
3271 &mut self,
3272 instance: Instance,
3273 callback: *mut VMFuncRef,
3274 post_return: *mut VMFuncRef,
3275 callee: *mut VMFuncRef,
3276 param_count: u32,
3277 result_count: u32,
3278 flags: u32,
3279 ) -> Result<u32>;
3280
3281 /// The `future.write` intrinsic.
3282 fn future_write(
3283 &mut self,
3284 instance: Instance,
3285 ty: TypeFutureTableIndex,
3286 options: OptionsIndex,
3287 future: u32,
3288 address: u32,
3289 ) -> Result<u32>;
3290
3291 /// The `future.read` intrinsic.
3292 fn future_read(
3293 &mut self,
3294 instance: Instance,
3295 ty: TypeFutureTableIndex,
3296 options: OptionsIndex,
3297 future: u32,
3298 address: u32,
3299 ) -> Result<u32>;
3300
3301 /// The `future.drop-writable` intrinsic.
3302 fn future_drop_writable(
3303 &mut self,
3304 instance: Instance,
3305 ty: TypeFutureTableIndex,
3306 writer: u32,
3307 ) -> Result<()>;
3308
3309 /// The `stream.write` intrinsic.
3310 fn stream_write(
3311 &mut self,
3312 instance: Instance,
3313 ty: TypeStreamTableIndex,
3314 options: OptionsIndex,
3315 stream: u32,
3316 address: u32,
3317 count: u32,
3318 ) -> Result<u32>;
3319
3320 /// The `stream.read` intrinsic.
3321 fn stream_read(
3322 &mut self,
3323 instance: Instance,
3324 ty: TypeStreamTableIndex,
3325 options: OptionsIndex,
3326 stream: u32,
3327 address: u32,
3328 count: u32,
3329 ) -> Result<u32>;
3330
3331 /// The "fast-path" implementation of the `stream.write` intrinsic for
3332 /// "flat" (i.e. memcpy-able) payloads.
3333 fn flat_stream_write(
3334 &mut self,
3335 instance: Instance,
3336 ty: TypeStreamTableIndex,
3337 options: OptionsIndex,
3338 payload_size: u32,
3339 payload_align: u32,
3340 stream: u32,
3341 address: u32,
3342 count: u32,
3343 ) -> Result<u32>;
3344
3345 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3346 /// (i.e. memcpy-able) payloads.
3347 fn flat_stream_read(
3348 &mut self,
3349 instance: Instance,
3350 ty: TypeStreamTableIndex,
3351 options: OptionsIndex,
3352 payload_size: u32,
3353 payload_align: u32,
3354 stream: u32,
3355 address: u32,
3356 count: u32,
3357 ) -> Result<u32>;
3358
3359 /// The `stream.drop-writable` intrinsic.
3360 fn stream_drop_writable(
3361 &mut self,
3362 instance: Instance,
3363 ty: TypeStreamTableIndex,
3364 writer: u32,
3365 ) -> Result<()>;
3366
3367 /// The `error-context.debug-message` intrinsic.
3368 fn error_context_debug_message(
3369 &mut self,
3370 instance: Instance,
3371 ty: TypeComponentLocalErrorContextTableIndex,
3372 options: OptionsIndex,
3373 err_ctx_handle: u32,
3374 debug_msg_address: u32,
3375 ) -> Result<()>;
3376}
3377
3378/// SAFETY: See trait docs.
3379impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3380 unsafe fn prepare_call(
3381 &mut self,
3382 instance: Instance,
3383 memory: *mut VMMemoryDefinition,
3384 start: *mut VMFuncRef,
3385 return_: *mut VMFuncRef,
3386 caller_instance: RuntimeComponentInstanceIndex,
3387 callee_instance: RuntimeComponentInstanceIndex,
3388 task_return_type: TypeTupleIndex,
3389 string_encoding: u8,
3390 result_count_or_max_if_async: u32,
3391 storage: *mut ValRaw,
3392 storage_len: usize,
3393 ) -> Result<()> {
3394 // SAFETY: The `wasmtime_cranelift`-generated code that calls
3395 // this method will have ensured that `storage` is a valid
3396 // pointer containing at least `storage_len` items.
3397 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3398
3399 unsafe {
3400 instance.prepare_call(
3401 StoreContextMut(self),
3402 start,
3403 return_,
3404 caller_instance,
3405 callee_instance,
3406 task_return_type,
3407 memory,
3408 string_encoding,
3409 match result_count_or_max_if_async {
3410 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3411 params,
3412 has_result: false,
3413 },
3414 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3415 params,
3416 has_result: true,
3417 },
3418 result_count => CallerInfo::Sync {
3419 params,
3420 result_count,
3421 },
3422 },
3423 )
3424 }
3425 }
3426
3427 unsafe fn sync_start(
3428 &mut self,
3429 instance: Instance,
3430 callback: *mut VMFuncRef,
3431 callee: *mut VMFuncRef,
3432 param_count: u32,
3433 storage: *mut MaybeUninit<ValRaw>,
3434 storage_len: usize,
3435 ) -> Result<()> {
3436 unsafe {
3437 instance
3438 .start_call(
3439 StoreContextMut(self),
3440 callback,
3441 ptr::null_mut(),
3442 callee,
3443 param_count,
3444 1,
3445 START_FLAG_ASYNC_CALLEE,
3446 // SAFETY: The `wasmtime_cranelift`-generated code that calls
3447 // this method will have ensured that `storage` is a valid
3448 // pointer containing at least `storage_len` items.
3449 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3450 )
3451 .map(drop)
3452 }
3453 }
3454
3455 unsafe fn async_start(
3456 &mut self,
3457 instance: Instance,
3458 callback: *mut VMFuncRef,
3459 post_return: *mut VMFuncRef,
3460 callee: *mut VMFuncRef,
3461 param_count: u32,
3462 result_count: u32,
3463 flags: u32,
3464 ) -> Result<u32> {
3465 unsafe {
3466 instance.start_call(
3467 StoreContextMut(self),
3468 callback,
3469 post_return,
3470 callee,
3471 param_count,
3472 result_count,
3473 flags,
3474 None,
3475 )
3476 }
3477 }
3478
3479 fn future_write(
3480 &mut self,
3481 instance: Instance,
3482 ty: TypeFutureTableIndex,
3483 options: OptionsIndex,
3484 future: u32,
3485 address: u32,
3486 ) -> Result<u32> {
3487 instance
3488 .guest_write(
3489 StoreContextMut(self),
3490 TableIndex::Future(ty),
3491 options,
3492 None,
3493 future,
3494 address,
3495 1,
3496 )
3497 .map(|result| result.encode())
3498 }
3499
3500 fn future_read(
3501 &mut self,
3502 instance: Instance,
3503 ty: TypeFutureTableIndex,
3504 options: OptionsIndex,
3505 future: u32,
3506 address: u32,
3507 ) -> Result<u32> {
3508 instance
3509 .guest_read(
3510 StoreContextMut(self),
3511 TableIndex::Future(ty),
3512 options,
3513 None,
3514 future,
3515 address,
3516 1,
3517 )
3518 .map(|result| result.encode())
3519 }
3520
3521 fn stream_write(
3522 &mut self,
3523 instance: Instance,
3524 ty: TypeStreamTableIndex,
3525 options: OptionsIndex,
3526 stream: u32,
3527 address: u32,
3528 count: u32,
3529 ) -> Result<u32> {
3530 instance
3531 .guest_write(
3532 StoreContextMut(self),
3533 TableIndex::Stream(ty),
3534 options,
3535 None,
3536 stream,
3537 address,
3538 count,
3539 )
3540 .map(|result| result.encode())
3541 }
3542
3543 fn stream_read(
3544 &mut self,
3545 instance: Instance,
3546 ty: TypeStreamTableIndex,
3547 options: OptionsIndex,
3548 stream: u32,
3549 address: u32,
3550 count: u32,
3551 ) -> Result<u32> {
3552 instance
3553 .guest_read(
3554 StoreContextMut(self),
3555 TableIndex::Stream(ty),
3556 options,
3557 None,
3558 stream,
3559 address,
3560 count,
3561 )
3562 .map(|result| result.encode())
3563 }
3564
3565 fn future_drop_writable(
3566 &mut self,
3567 instance: Instance,
3568 ty: TypeFutureTableIndex,
3569 writer: u32,
3570 ) -> Result<()> {
3571 instance.guest_drop_writable(StoreContextMut(self), TableIndex::Future(ty), writer)
3572 }
3573
3574 fn flat_stream_write(
3575 &mut self,
3576 instance: Instance,
3577 ty: TypeStreamTableIndex,
3578 options: OptionsIndex,
3579 payload_size: u32,
3580 payload_align: u32,
3581 stream: u32,
3582 address: u32,
3583 count: u32,
3584 ) -> Result<u32> {
3585 instance
3586 .guest_write(
3587 StoreContextMut(self),
3588 TableIndex::Stream(ty),
3589 options,
3590 Some(FlatAbi {
3591 size: payload_size,
3592 align: payload_align,
3593 }),
3594 stream,
3595 address,
3596 count,
3597 )
3598 .map(|result| result.encode())
3599 }
3600
3601 fn flat_stream_read(
3602 &mut self,
3603 instance: Instance,
3604 ty: TypeStreamTableIndex,
3605 options: OptionsIndex,
3606 payload_size: u32,
3607 payload_align: u32,
3608 stream: u32,
3609 address: u32,
3610 count: u32,
3611 ) -> Result<u32> {
3612 instance
3613 .guest_read(
3614 StoreContextMut(self),
3615 TableIndex::Stream(ty),
3616 options,
3617 Some(FlatAbi {
3618 size: payload_size,
3619 align: payload_align,
3620 }),
3621 stream,
3622 address,
3623 count,
3624 )
3625 .map(|result| result.encode())
3626 }
3627
3628 fn stream_drop_writable(
3629 &mut self,
3630 instance: Instance,
3631 ty: TypeStreamTableIndex,
3632 writer: u32,
3633 ) -> Result<()> {
3634 instance.guest_drop_writable(StoreContextMut(self), TableIndex::Stream(ty), writer)
3635 }
3636
3637 fn error_context_debug_message(
3638 &mut self,
3639 instance: Instance,
3640 ty: TypeComponentLocalErrorContextTableIndex,
3641 options: OptionsIndex,
3642 err_ctx_handle: u32,
3643 debug_msg_address: u32,
3644 ) -> Result<()> {
3645 instance.error_context_debug_message(
3646 StoreContextMut(self),
3647 ty,
3648 options,
3649 err_ctx_handle,
3650 debug_msg_address,
3651 )
3652 }
3653}
3654
3655/// Represents the output of a host task or background task.
3656pub(crate) enum HostTaskOutput {
3657 /// A plain result
3658 Result(Result<()>),
3659 /// A function to be run after the future completes (e.g. post-processing
3660 /// which requires access to the store and instance).
3661 Function(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>),
3662}
3663
3664impl HostTaskOutput {
3665 /// Retrieve the result of the host or background task, running the
3666 /// post-processing function if present.
3667 fn consume(self, store: &mut dyn VMStore, instance: Instance) -> Result<()> {
3668 match self {
3669 Self::Function(fun) => fun(store, instance),
3670 Self::Result(result) => result,
3671 }
3672 }
3673}
3674
3675type HostTaskFuture = Pin<Box<dyn Future<Output = HostTaskOutput> + Send + 'static>>;
3676
3677/// Represents the state of a pending host task.
3678struct HostTask {
3679 common: WaitableCommon,
3680 caller_instance: RuntimeComponentInstanceIndex,
3681 abort_handle: Option<AbortHandle>,
3682}
3683
3684impl HostTask {
3685 fn new(
3686 caller_instance: RuntimeComponentInstanceIndex,
3687 abort_handle: Option<AbortHandle>,
3688 ) -> Self {
3689 Self {
3690 common: WaitableCommon::default(),
3691 caller_instance,
3692 abort_handle,
3693 }
3694 }
3695}
3696
3697impl TableDebug for HostTask {
3698 fn type_name() -> &'static str {
3699 "HostTask"
3700 }
3701}
3702
3703type CallbackFn = Box<
3704 dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3705 + Send
3706 + Sync
3707 + 'static,
3708>;
3709
3710/// Represents the caller of a given guest task.
3711enum Caller {
3712 /// The host called the guest task.
3713 Host {
3714 /// If present, may be used to deliver the result.
3715 tx: Option<oneshot::Sender<LiftedResult>>,
3716 /// If true, remove the task from the concurrent state that owns it
3717 /// automatically after it completes.
3718 remove_task_automatically: bool,
3719 /// If true, call `post-return` function (if any) automatically.
3720 call_post_return_automatically: bool,
3721 },
3722 /// Another guest task called the guest task
3723 Guest {
3724 /// The id of the caller
3725 task: TableId<GuestTask>,
3726 /// The instance to use to enforce reentrance rules.
3727 ///
3728 /// Note that this might not be the same as the instance the caller task
3729 /// started executing in given that one or more synchronous guest->guest
3730 /// calls may have occurred involving multiple instances.
3731 instance: RuntimeComponentInstanceIndex,
3732 },
3733}
3734
3735/// Represents a closure and related canonical ABI parameters required to
3736/// validate a `task.return` call at runtime and lift the result.
3737struct LiftResult {
3738 lift: RawLift,
3739 ty: TypeTupleIndex,
3740 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3741 string_encoding: StringEncoding,
3742}
3743
3744/// Represents a pending guest task.
3745struct GuestTask {
3746 /// See `WaitableCommon`
3747 common: WaitableCommon,
3748 /// Closure to lower the parameters passed to this task.
3749 lower_params: Option<RawLower>,
3750 /// See `LiftResult`
3751 lift_result: Option<LiftResult>,
3752 /// A place to stash the type-erased lifted result if it can't be delivered
3753 /// immediately.
3754 result: Option<LiftedResult>,
3755 /// Closure to call the callback function for an async-lifted export, if
3756 /// provided.
3757 callback: Option<CallbackFn>,
3758 /// See `Caller`
3759 caller: Caller,
3760 /// A place to stash the call context for managing resource borrows while
3761 /// switching between guest tasks.
3762 call_context: Option<CallContext>,
3763 /// A place to stash the lowered result for a sync-to-async call until it
3764 /// can be returned to the caller.
3765 sync_result: Option<Option<ValRaw>>,
3766 /// Whether or not the task has been cancelled (i.e. whether the task is
3767 /// permitted to call `task.cancel`).
3768 cancel_sent: bool,
3769 /// Whether or not we've sent a `Status::Starting` event to any current or
3770 /// future waiters for this waitable.
3771 starting_sent: bool,
3772 /// Context-local state used to implement the `context.{get,set}`
3773 /// intrinsics.
3774 context: [u32; 2],
3775 /// Pending guest subtasks created by this task (directly or indirectly).
3776 ///
3777 /// This is used to re-parent subtasks which are still running when their
3778 /// parent task is disposed.
3779 subtasks: HashSet<TableId<GuestTask>>,
3780 /// Scratch waitable set used to watch subtasks during synchronous calls.
3781 sync_call_set: TableId<WaitableSet>,
3782 /// The instance to which the exported function for this guest task belongs.
3783 ///
3784 /// Note that the task may do a sync->sync call via a fused adapter which
3785 /// results in that task executing code in a different instance, and it may
3786 /// call host functions and intrinsics from that other instance.
3787 instance: RuntimeComponentInstanceIndex,
3788 /// If present, a pending `Event::None` or `Event::Cancelled` to be
3789 /// delivered to this task.
3790 event: Option<Event>,
3791 /// If present, indicates that the task is currently waiting on the
3792 /// specified set but may be cancelled and woken immediately.
3793 wake_on_cancel: Option<TableId<WaitableSet>>,
3794 /// The `ExportIndex` of the guest function being called, if known.
3795 function_index: Option<ExportIndex>,
3796 /// Whether or not the task has exited.
3797 exited: bool,
3798}
3799
3800impl GuestTask {
3801 fn new(
3802 state: &mut ConcurrentState,
3803 lower_params: RawLower,
3804 lift_result: LiftResult,
3805 caller: Caller,
3806 callback: Option<CallbackFn>,
3807 component_instance: RuntimeComponentInstanceIndex,
3808 ) -> Result<Self> {
3809 let sync_call_set = state.push(WaitableSet::default())?;
3810
3811 Ok(Self {
3812 common: WaitableCommon::default(),
3813 lower_params: Some(lower_params),
3814 lift_result: Some(lift_result),
3815 result: None,
3816 callback,
3817 caller,
3818 call_context: Some(CallContext::default()),
3819 sync_result: None,
3820 cancel_sent: false,
3821 starting_sent: false,
3822 context: [0u32; 2],
3823 subtasks: HashSet::new(),
3824 sync_call_set,
3825 instance: component_instance,
3826 event: None,
3827 wake_on_cancel: None,
3828 function_index: None,
3829 exited: false,
3830 })
3831 }
3832
3833 /// Dispose of this guest task, reparenting any pending subtasks to the
3834 /// caller.
3835 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3836 // If there are not-yet-delivered completion events for subtasks in
3837 // `self.sync_call_set`, recursively dispose of those subtasks as well.
3838 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3839 if let Some(Event::Subtask {
3840 status: Status::Returned | Status::ReturnCancelled,
3841 }) = waitable.common(state)?.event
3842 {
3843 waitable.delete_from(state)?;
3844 }
3845 }
3846
3847 state.delete(self.sync_call_set)?;
3848
3849 // Reparent any pending subtasks to the caller.
3850 if let Caller::Guest {
3851 task,
3852 instance: runtime_instance,
3853 } = &self.caller
3854 {
3855 let task_mut = state.get_mut(*task)?;
3856 let present = task_mut.subtasks.remove(&me);
3857 assert!(present);
3858
3859 for subtask in &self.subtasks {
3860 task_mut.subtasks.insert(*subtask);
3861 }
3862
3863 for subtask in &self.subtasks {
3864 state.get_mut(*subtask)?.caller = Caller::Guest {
3865 task: *task,
3866 instance: *runtime_instance,
3867 };
3868 }
3869 } else {
3870 for subtask in &self.subtasks {
3871 state.get_mut(*subtask)?.caller = Caller::Host {
3872 tx: None,
3873 remove_task_automatically: true,
3874 call_post_return_automatically: true,
3875 };
3876 }
3877 }
3878
3879 Ok(())
3880 }
3881
3882 fn call_post_return_automatically(&self) -> bool {
3883 matches!(
3884 self.caller,
3885 Caller::Guest { .. }
3886 | Caller::Host {
3887 call_post_return_automatically: true,
3888 ..
3889 }
3890 )
3891 }
3892}
3893
3894impl TableDebug for GuestTask {
3895 fn type_name() -> &'static str {
3896 "GuestTask"
3897 }
3898}
3899
3900/// Represents state common to all kinds of waitables.
3901#[derive(Default)]
3902struct WaitableCommon {
3903 /// The currently pending event for this waitable, if any.
3904 event: Option<Event>,
3905 /// The set to which this waitable belongs, if any.
3906 set: Option<TableId<WaitableSet>>,
3907}
3908
3909/// Represents a Component Model Async `waitable`.
3910#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3911enum Waitable {
3912 /// A host task
3913 Host(TableId<HostTask>),
3914 /// A guest task
3915 Guest(TableId<GuestTask>),
3916 /// The read or write end of a stream or future
3917 Transmit(TableId<TransmitHandle>),
3918}
3919
3920impl Waitable {
3921 /// Retrieve the `Waitable` corresponding to the specified guest-visible
3922 /// handle.
3923 fn from_instance(
3924 state: &mut ConcurrentState,
3925 caller_instance: RuntimeComponentInstanceIndex,
3926 waitable: u32,
3927 ) -> Result<Self> {
3928 let (waitable, state) =
3929 state.waitable_tables[caller_instance].get_mut_by_index(waitable)?;
3930
3931 Ok(match state {
3932 WaitableState::HostTask => Waitable::Host(TableId::new(waitable)),
3933 WaitableState::GuestTask => Waitable::Guest(TableId::new(waitable)),
3934 WaitableState::Stream(..) | WaitableState::Future(..) => {
3935 Waitable::Transmit(TableId::new(waitable))
3936 }
3937 _ => bail!("invalid waitable handle"),
3938 })
3939 }
3940
3941 /// Retrieve the host-visible identifier for this `Waitable`.
3942 fn rep(&self) -> u32 {
3943 match self {
3944 Self::Host(id) => id.rep(),
3945 Self::Guest(id) => id.rep(),
3946 Self::Transmit(id) => id.rep(),
3947 }
3948 }
3949
3950 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
3951 /// remove it from any set it may currently belong to (when `set` is
3952 /// `None`).
3953 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3954 log::trace!("waitable {self:?} join set {set:?}",);
3955
3956 let old = mem::replace(&mut self.common(state)?.set, set);
3957
3958 if let Some(old) = old {
3959 match *self {
3960 Waitable::Host(id) => state.remove_child(id, old),
3961 Waitable::Guest(id) => state.remove_child(id, old),
3962 Waitable::Transmit(id) => state.remove_child(id, old),
3963 }?;
3964
3965 state.get_mut(old)?.ready.remove(self);
3966 }
3967
3968 if let Some(set) = set {
3969 match *self {
3970 Waitable::Host(id) => state.add_child(id, set),
3971 Waitable::Guest(id) => state.add_child(id, set),
3972 Waitable::Transmit(id) => state.add_child(id, set),
3973 }?;
3974
3975 if self.common(state)?.event.is_some() {
3976 self.mark_ready(state)?;
3977 }
3978 }
3979
3980 Ok(())
3981 }
3982
3983 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
3984 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3985 Ok(match self {
3986 Self::Host(id) => &mut state.get_mut(*id)?.common,
3987 Self::Guest(id) => &mut state.get_mut(*id)?.common,
3988 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3989 })
3990 }
3991
3992 /// Set or clear the pending event for this waitable and either deliver it
3993 /// to the first waiter, if any, or mark it as ready to be delivered to the
3994 /// next waiter that arrives.
3995 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3996 log::trace!("set event for {self:?}: {event:?}");
3997 self.common(state)?.event = event;
3998 self.mark_ready(state)
3999 }
4000
4001 /// Take the pending event from this waitable, leaving `None` in its place.
4002 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4003 let common = self.common(state)?;
4004 let event = common.event.take();
4005 if let Some(set) = self.common(state)?.set {
4006 state.get_mut(set)?.ready.remove(self);
4007 }
4008 Ok(event)
4009 }
4010
4011 /// Deliver the current event for this waitable to the first waiter, if any,
4012 /// or else mark it as ready to be delivered to the next waiter that
4013 /// arrives.
4014 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4015 if let Some(set) = self.common(state)?.set {
4016 state.get_mut(set)?.ready.insert(*self);
4017 if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
4018 let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
4019 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4020
4021 let item = match mode {
4022 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4023 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4024 task,
4025 kind: GuestCallKind::DeliverEvent {
4026 instance,
4027 set: Some(set),
4028 },
4029 }),
4030 };
4031 state.push_high_priority(item);
4032 }
4033 }
4034 Ok(())
4035 }
4036
4037 /// Handle the imminent delivery of the specified event, e.g. by updating
4038 /// the state of the stream or future.
4039 fn on_delivery(&self, state: &mut ConcurrentState, event: Event) {
4040 match event {
4041 Event::FutureRead {
4042 pending: Some((ty, handle)),
4043 ..
4044 }
4045 | Event::FutureWrite {
4046 pending: Some((ty, handle)),
4047 ..
4048 } => {
4049 let runtime_instance = state.component.types()[ty].instance;
4050 let (rep, WaitableState::Future(actual_ty, state)) = state.waitable_tables
4051 [runtime_instance]
4052 .get_mut_by_index(handle)
4053 .unwrap()
4054 else {
4055 unreachable!()
4056 };
4057 assert_eq!(*actual_ty, ty);
4058 assert_eq!(rep, self.rep());
4059 assert_eq!(*state, StreamFutureState::Busy);
4060 *state = match event {
4061 Event::FutureRead { .. } => StreamFutureState::Read { done: false },
4062 Event::FutureWrite { .. } => StreamFutureState::Write { done: false },
4063 _ => unreachable!(),
4064 };
4065 }
4066 Event::StreamRead {
4067 pending: Some((ty, handle)),
4068 code,
4069 }
4070 | Event::StreamWrite {
4071 pending: Some((ty, handle)),
4072 code,
4073 } => {
4074 let runtime_instance = state.component.types()[ty].instance;
4075 let (rep, WaitableState::Stream(actual_ty, state)) = state.waitable_tables
4076 [runtime_instance]
4077 .get_mut_by_index(handle)
4078 .unwrap()
4079 else {
4080 unreachable!()
4081 };
4082 assert_eq!(*actual_ty, ty);
4083 assert_eq!(rep, self.rep());
4084 assert_eq!(*state, StreamFutureState::Busy);
4085 let done = matches!(code, ReturnCode::Dropped(_));
4086 *state = match event {
4087 Event::StreamRead { .. } => StreamFutureState::Read { done },
4088 Event::StreamWrite { .. } => StreamFutureState::Write { done },
4089 _ => unreachable!(),
4090 };
4091 }
4092 _ => {}
4093 }
4094 }
4095
4096 /// Remove this waitable from the instance's rep table.
4097 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4098 match self {
4099 Self::Host(task) => {
4100 log::trace!("delete host task {task:?}");
4101 state.delete(*task)?;
4102 }
4103 Self::Guest(task) => {
4104 log::trace!("delete guest task {task:?}");
4105 state.delete(*task)?.dispose(state, *task)?;
4106 }
4107 Self::Transmit(task) => {
4108 state.delete(*task)?;
4109 }
4110 }
4111
4112 Ok(())
4113 }
4114}
4115
4116impl fmt::Debug for Waitable {
4117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4118 match self {
4119 Self::Host(id) => write!(f, "{id:?}"),
4120 Self::Guest(id) => write!(f, "{id:?}"),
4121 Self::Transmit(id) => write!(f, "{id:?}"),
4122 }
4123 }
4124}
4125
4126/// Represents a Component Model Async `waitable-set`.
4127#[derive(Default)]
4128struct WaitableSet {
4129 /// Which waitables in this set have pending events, if any.
4130 ready: BTreeSet<Waitable>,
4131 /// Which guest tasks are currently waiting on this set, if any.
4132 waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
4133}
4134
4135impl TableDebug for WaitableSet {
4136 fn type_name() -> &'static str {
4137 "WaitableSet"
4138 }
4139}
4140
4141/// Type-erased closure to lower the parameters for a guest task.
4142type RawLower = Box<
4143 dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4144>;
4145
4146/// Type-erased closure to lift the result for a guest task.
4147type RawLift = Box<
4148 dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4149 + Send
4150 + Sync,
4151>;
4152
4153/// Type erased result of a guest task which may be downcast to the expected
4154/// type by a host caller (or simply ignored in the case of a guest caller; see
4155/// `DummyResult`).
4156type LiftedResult = Box<dyn Any + Send + Sync>;
4157
4158/// Used to return a result from a `LiftFn` when the actual result has already
4159/// been lowered to a guest task's stack and linear memory.
4160struct DummyResult;
4161
4162/// Represents the state of a currently executing fiber which has been resumed
4163/// via `self::poll_fn`.
4164pub(crate) struct AsyncState {
4165 /// The current instance being polled, if any, which is used to perform
4166 /// checks to ensure that futures are always polled within the correct
4167 /// instance.
4168 current_instance: Option<ComponentInstanceId>,
4169}
4170
4171impl Default for AsyncState {
4172 fn default() -> Self {
4173 Self {
4174 current_instance: None,
4175 }
4176 }
4177}
4178
4179/// Represents the Component Model Async state of a (sub-)component instance.
4180#[derive(Default)]
4181struct InstanceState {
4182 /// Whether backpressure is set for this instance
4183 backpressure: bool,
4184 /// Whether this instance can be entered
4185 do_not_enter: bool,
4186 /// Pending calls for this instance which require `Self::backpressure` to be
4187 /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4188 pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
4189}
4190
4191/// Represents the Component Model Async state of a top-level component instance
4192/// (i.e. a `super::ComponentInstance`).
4193pub struct ConcurrentState {
4194 /// The currently running guest task, if any.
4195 guest_task: Option<TableId<GuestTask>>,
4196 /// The set of pending host and background tasks, if any.
4197 ///
4198 /// We must wrap this in a `Mutex` to ensure that `ComponentInstance` and
4199 /// `Store` satisfy a `Sync` bound, but it can't actually be accessed from
4200 /// more than one thread at a time.
4201 ///
4202 /// See `ComponentInstance::poll_until` for where we temporarily take this
4203 /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4204 futures: Mutex<Option<FuturesUnordered<HostTaskFuture>>>,
4205 /// The table of waitables, waitable sets, etc.
4206 table: Table,
4207 /// Per (sub-)component instance states.
4208 ///
4209 /// See `InstanceState` for details and note that this map is lazily
4210 /// populated as needed.
4211 // TODO: this can and should be a `PrimaryMap`
4212 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4213 /// Tables for tracking per-(sub-)component waitable handles and their
4214 /// states.
4215 waitable_tables: PrimaryMap<RuntimeComponentInstanceIndex, StateTable<WaitableState>>,
4216 /// The "high priority" work queue for this instance's event loop.
4217 high_priority: Vec<WorkItem>,
4218 /// The "high priority" work queue for this instance's event loop.
4219 low_priority: Vec<WorkItem>,
4220 /// A place to stash the reason a fiber is suspending so that the code which
4221 /// resumed it will know under what conditions the fiber should be resumed
4222 /// again.
4223 suspend_reason: Option<SuspendReason>,
4224 /// A cached fiber which is waiting for work to do.
4225 ///
4226 /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4227 worker: Option<StoreFiber<'static>>,
4228 /// A place to stash the work item for which we're resuming a worker fiber.
4229 worker_item: Option<WorkerItem>,
4230
4231 /// (Sub)Component specific error context tracking
4232 ///
4233 /// At the component level, only the number of references (`usize`) to a given error context is tracked,
4234 /// with state related to the error context being held at the component model level, in concurrent
4235 /// state.
4236 ///
4237 /// The state tables in the (sub)component local tracking must contain a pointer into the global
4238 /// error context lookups in order to ensure that in contexts where only the local reference is present
4239 /// the global state can still be maintained/updated.
4240 error_context_tables:
4241 PrimaryMap<TypeComponentLocalErrorContextTableIndex, StateTable<LocalErrorContextRefCount>>,
4242
4243 /// Reference counts for all component error contexts
4244 ///
4245 /// NOTE: it is possible the global ref count to be *greater* than the sum of
4246 /// (sub)component ref counts as tracked by `error_context_tables`, for
4247 /// example when the host holds one or more references to error contexts.
4248 ///
4249 /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4250 /// component-wide representation) of the index into concurrent state for a given
4251 /// stored `ErrorContext`.
4252 ///
4253 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4254 /// as a `TableId<ErrorContextState>`.
4255 global_error_context_ref_counts:
4256 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4257
4258 /// Mirror of type information in `ComponentInstance`, placed here for
4259 /// convenience at the cost of an extra `Arc` clone.
4260 component: Component,
4261}
4262
4263impl ConcurrentState {
4264 pub(crate) fn new(component: &Component) -> Self {
4265 let num_waitable_tables = component.env_component().num_runtime_component_instances;
4266 let num_error_context_tables = component.env_component().num_error_context_tables;
4267 let mut waitable_tables =
4268 PrimaryMap::with_capacity(usize::try_from(num_waitable_tables).unwrap());
4269 for _ in 0..num_waitable_tables {
4270 waitable_tables.push(StateTable::default());
4271 }
4272
4273 let mut error_context_tables = PrimaryMap::<
4274 TypeComponentLocalErrorContextTableIndex,
4275 StateTable<LocalErrorContextRefCount>,
4276 >::with_capacity(num_error_context_tables);
4277 for _ in 0..num_error_context_tables {
4278 error_context_tables.push(StateTable::default());
4279 }
4280
4281 Self {
4282 guest_task: None,
4283 table: Table::new(),
4284 futures: Mutex::new(Some(FuturesUnordered::new())),
4285 instance_states: HashMap::new(),
4286 waitable_tables,
4287 high_priority: Vec::new(),
4288 low_priority: Vec::new(),
4289 suspend_reason: None,
4290 worker: None,
4291 worker_item: None,
4292 error_context_tables,
4293 global_error_context_ref_counts: BTreeMap::new(),
4294 component: component.clone(),
4295 }
4296 }
4297
4298 /// Take ownership of any fibers and futures owned by this object.
4299 ///
4300 /// This should be used when disposing of the `Store` containing this object
4301 /// in order to gracefully resolve any and all fibers using
4302 /// `StoreFiber::dispose`. This is necessary to avoid possible
4303 /// use-after-free bugs due to fibers which may still have access to the
4304 /// `Store`.
4305 ///
4306 /// Additionally, the futures collected with this function should be dropped
4307 /// within a `tls::set` call, which will ensure than any futures closing
4308 /// over an `&Accessor` will have access to the store when dropped, allowing
4309 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4310 /// panicking.
4311 ///
4312 /// Note that this will leave the object in an inconsistent and unusable
4313 /// state, so it should only be used just prior to dropping it.
4314 pub(crate) fn take_fibers_and_futures(
4315 &mut self,
4316 fibers: &mut Vec<StoreFiber<'static>>,
4317 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4318 ) {
4319 for entry in self.table.iter_mut() {
4320 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4321 for mode in mem::take(&mut set.waiting).into_values() {
4322 if let WaitMode::Fiber(fiber) = mode {
4323 fibers.push(fiber);
4324 }
4325 }
4326 }
4327 }
4328
4329 if let Some(fiber) = self.worker.take() {
4330 fibers.push(fiber);
4331 }
4332
4333 let mut take_items = |list| {
4334 for item in mem::take(list) {
4335 match item {
4336 WorkItem::ResumeFiber(fiber) => {
4337 fibers.push(fiber);
4338 }
4339 WorkItem::PushFuture(future) => {
4340 self.futures
4341 .get_mut()
4342 .unwrap()
4343 .as_mut()
4344 .unwrap()
4345 .push(future.into_inner().unwrap());
4346 }
4347 _ => {}
4348 }
4349 }
4350 };
4351
4352 take_items(&mut self.high_priority);
4353 take_items(&mut self.low_priority);
4354
4355 if let Some(them) = self.futures.get_mut().unwrap().take() {
4356 futures.push(them);
4357 }
4358 }
4359}
4360
4361/// Provide a type hint to compiler about the shape of a parameter lower
4362/// closure.
4363fn for_any_lower<
4364 F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4365>(
4366 fun: F,
4367) -> F {
4368 fun
4369}
4370
4371/// Provide a type hint to compiler about the shape of a result lift closure.
4372fn for_any_lift<
4373 F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4374 + Send
4375 + Sync,
4376>(
4377 fun: F,
4378) -> F {
4379 fun
4380}
4381
4382/// Wrap the specified future in a `poll_fn` which asserts that the future is
4383/// only polled from the event loop of the specified `Instance`.
4384///
4385/// See `Instance::run_concurrent` for details.
4386fn checked<F: Future + Send + 'static>(
4387 instance: Instance,
4388 fut: F,
4389) -> impl Future<Output = F::Output> + Send + 'static {
4390 async move {
4391 let mut fut = pin!(fut);
4392 future::poll_fn(move |cx| {
4393 let message = "\
4394 `Future`s which depend on asynchronous component tasks, streams, or \
4395 futures to complete may only be polled from the event loop of the \
4396 instance from which they originated. Please use \
4397 `Instance::{run_concurrent,spawn}` to poll or await them.\
4398 ";
4399 tls::try_get(|store| {
4400 let matched = match store {
4401 tls::TryGet::Some(store) => {
4402 let a = store.concurrent_async_state_mut().current_instance;
4403 a == Some(instance.id().instance())
4404 }
4405 tls::TryGet::Taken | tls::TryGet::None => false,
4406 };
4407
4408 if !matched {
4409 panic!("{message}")
4410 }
4411 });
4412 fut.as_mut().poll(cx)
4413 })
4414 .await
4415 }
4416}
4417
4418/// Assert that `Instance::run_concurrent` has not been called from within an
4419/// instance's event loop.
4420fn check_recursive_run() {
4421 tls::try_get(|store| {
4422 if !matches!(store, tls::TryGet::None) {
4423 panic!("Recursive `Instance::run_concurrent` calls not supported")
4424 }
4425 });
4426}
4427
4428fn unpack_callback_code(code: u32) -> (u32, u32) {
4429 (code & 0xF, code >> 4)
4430}
4431
4432/// Helper struct for packaging parameters to be passed to
4433/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4434/// `waitable-set.poll`.
4435struct WaitableCheckParams {
4436 set: TableId<WaitableSet>,
4437 caller_instance: RuntimeComponentInstanceIndex,
4438 options: OptionsIndex,
4439 payload: u32,
4440}
4441
4442/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4443enum WaitableCheck {
4444 Wait(WaitableCheckParams),
4445 Poll(WaitableCheckParams),
4446 Yield,
4447}
4448
4449/// Represents a guest task called from the host, prepared using `prepare_call`.
4450pub(crate) struct PreparedCall<R> {
4451 /// The guest export to be called
4452 handle: Func,
4453 /// The guest task created by `prepare_call`
4454 task: TableId<GuestTask>,
4455 /// The number of lowered core Wasm parameters to pass to the call.
4456 param_count: usize,
4457 /// The `oneshot::Receiver` to which the result of the call will be
4458 /// delivered when it is available.
4459 rx: oneshot::Receiver<LiftedResult>,
4460 _phantom: PhantomData<R>,
4461}
4462
4463impl<R> PreparedCall<R> {
4464 /// Get a copy of the `TaskId` for this `PreparedCall`.
4465 pub(crate) fn task_id(&self) -> TaskId {
4466 TaskId {
4467 handle: self.handle,
4468 task: self.task,
4469 }
4470 }
4471}
4472
4473/// Represents a task created by `prepare_call`.
4474pub(crate) struct TaskId {
4475 handle: Func,
4476 task: TableId<GuestTask>,
4477}
4478
4479impl TaskId {
4480 /// Remove the specified task from the concurrent state to which it belongs.
4481 ///
4482 /// This must be used with care to avoid use-after-delete or double-delete
4483 /// bugs. Specifically, it should only be called on tasks created with the
4484 /// `remove_task_automatically` parameter to `prepare_call` set to `false`,
4485 /// which tells the runtime that the caller is responsible for removing the
4486 /// task from the state; otherwise, it will be removed automatically. Also,
4487 /// it should only be called once for a given task, and only after either
4488 /// the task has completed or the instance has trapped.
4489 pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4490 Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
4491 }
4492}
4493
4494/// Prepare a call to the specified exported Wasm function, providing functions
4495/// for lowering the parameters and lifting the result.
4496///
4497/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4498/// loop, use `queue_call`.
4499pub(crate) fn prepare_call<T, R>(
4500 mut store: StoreContextMut<T>,
4501 handle: Func,
4502 param_count: usize,
4503 remove_task_automatically: bool,
4504 call_post_return_automatically: bool,
4505 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4506 + Send
4507 + Sync
4508 + 'static,
4509 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4510 + Send
4511 + Sync
4512 + 'static,
4513) -> Result<PreparedCall<R>> {
4514 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4515
4516 let instance = handle.instance().id().get(store.0);
4517 let task_return_type = instance.component().types()[ty].results;
4518 let component_instance = raw_options.instance;
4519 let callback = options.callback();
4520 let memory = options.memory_raw().map(SendSyncPtr::new);
4521 let string_encoding = options.string_encoding();
4522 let token = StoreToken::new(store.as_context_mut());
4523 let state = handle.instance().concurrent_state_mut(store.0);
4524
4525 assert!(state.guest_task.is_none());
4526
4527 let (tx, rx) = oneshot::channel();
4528
4529 let mut task = GuestTask::new(
4530 state,
4531 Box::new(for_any_lower(move |store, instance, params| {
4532 debug_assert!(instance.id() == handle.instance().id());
4533 lower_params(handle, token.as_context_mut(store), params)
4534 })),
4535 LiftResult {
4536 lift: Box::new(for_any_lift(move |store, instance, result| {
4537 debug_assert!(instance.id() == handle.instance().id());
4538 lift_result(handle, store, result)
4539 })),
4540 ty: task_return_type,
4541 memory,
4542 string_encoding,
4543 },
4544 Caller::Host {
4545 tx: Some(tx),
4546 remove_task_automatically,
4547 call_post_return_automatically,
4548 },
4549 callback.map(|callback| {
4550 let callback = SendSyncPtr::new(callback);
4551 Box::new(
4552 move |store: &mut dyn VMStore,
4553 instance: Instance,
4554 runtime_instance,
4555 event,
4556 handle| {
4557 let store = token.as_context_mut(store);
4558 // SAFETY: Per the contract of `prepare_call`, the callback
4559 // will remain valid at least as long is this task exists.
4560 unsafe {
4561 instance.call_callback(
4562 store,
4563 runtime_instance,
4564 callback,
4565 event,
4566 handle,
4567 call_post_return_automatically,
4568 )
4569 }
4570 },
4571 ) as CallbackFn
4572 }),
4573 component_instance,
4574 )?;
4575 task.function_index = Some(handle.index());
4576
4577 let task = state.push(task)?;
4578
4579 Ok(PreparedCall {
4580 handle,
4581 task,
4582 param_count,
4583 rx,
4584 _phantom: PhantomData,
4585 })
4586}
4587
4588/// Queue a call previously prepared using `prepare_call` to be run as part of
4589/// the associated `ComponentInstance`'s event loop.
4590///
4591/// The returned future will resolve to the result once it is available, but
4592/// must only be polled via the instance's event loop. See
4593/// `Instance::run_concurrent` for details.
4594pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4595 mut store: StoreContextMut<T>,
4596 prepared: PreparedCall<R>,
4597) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
4598 let PreparedCall {
4599 handle,
4600 task,
4601 param_count,
4602 rx,
4603 ..
4604 } = prepared;
4605
4606 queue_call0(store.as_context_mut(), handle, task, param_count)?;
4607
4608 Ok(checked(
4609 handle.instance(),
4610 rx.map(|result| {
4611 result
4612 .map(|v| *v.downcast().unwrap())
4613 .map_err(anyhow::Error::from)
4614 }),
4615 ))
4616}
4617
4618/// Queue a call previously prepared using `prepare_call` to be run as part of
4619/// the associated `ComponentInstance`'s event loop.
4620fn queue_call0<T: 'static>(
4621 store: StoreContextMut<T>,
4622 handle: Func,
4623 guest_task: TableId<GuestTask>,
4624 param_count: usize,
4625) -> Result<()> {
4626 let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4627 let is_concurrent = raw_options.async_;
4628 let instance = handle.instance();
4629 let callee = handle.lifted_core_func(store.0);
4630 let callback = options.callback();
4631 let post_return = handle.post_return_core_func(store.0);
4632
4633 log::trace!("queueing call {guest_task:?}");
4634
4635 let instance_flags = if callback.is_none() {
4636 None
4637 } else {
4638 Some(flags)
4639 };
4640
4641 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
4642 // (with signatures appropriate for this call) and will remain valid as
4643 // long as this instance is valid.
4644 unsafe {
4645 instance.queue_call(
4646 store,
4647 guest_task,
4648 SendSyncPtr::new(callee),
4649 param_count,
4650 1,
4651 instance_flags,
4652 is_concurrent,
4653 callback.map(SendSyncPtr::new),
4654 post_return.map(SendSyncPtr::new),
4655 )
4656 }
4657}