wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `ComponentInstance` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `Instance::run_concurrent`.  The `ComponentInstance::poll_until`
14//! function contains the loop itself, while the
15//! `ComponentInstance::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `Instance::run_concurrent`: Run the event loop for the specified instance,
25//! allowing any and all tasks belonging to that instance to make progress.
26//!
27//! - `Instance::spawn`: Run a background task as part of the event loop for the
28//! specified instance.
29//!
30//! - `Instance::{future,stream}`: Create a new Component Model `future` or
31//! `stream`; the read end may be passed to the guest.
32//!
33//! - `{Future,Stream}Reader::read` and `{Future,Stream}Writer::write`: read
34//! from or write to a future or stream, respectively.
35//!
36//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
37//!
38//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
39//! function with the linker.  That function will take an `Accessor` as its
40//! first parameter, which provides access to the store and instance between
41//! (but not across) await points.
42//!
43//! - `Accessor::with`: Access the store, its associated data, and the current
44//! instance.
45//!
46//! - `Accessor::spawn`: Run a background task as part of the event loop for the
47//! specified instance.  This is equivalent to `Instance::spawn` but more
48//! convenient to use in host functions.
49
50use crate::component::func::{self, Func, Options};
51use crate::component::{Component, ComponentInstanceId, HasData, HasSelf, Instance};
52use crate::fiber::{self, StoreFiber, StoreFiberYield};
53use crate::store::{StoreInner, StoreOpaque, StoreToken};
54use crate::vm::component::{CallContext, InstanceFlags, ResourceTables};
55use crate::vm::{SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
56use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
57use anyhow::{Context as _, Result, anyhow, bail};
58use error_contexts::{GlobalErrorContextRefCount, LocalErrorContextRefCount};
59use futures::channel::oneshot;
60use futures::future::{self, Either, FutureExt};
61use futures::stream::{FuturesUnordered, StreamExt};
62use futures_and_streams::{FlatAbi, ReturnCode, StreamFutureState, TableIndex, TransmitHandle};
63use states::StateTable;
64use std::any::Any;
65use std::borrow::ToOwned;
66use std::boxed::Box;
67use std::cell::UnsafeCell;
68use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
69use std::fmt;
70use std::future::Future;
71use std::marker::PhantomData;
72use std::mem::{self, ManuallyDrop, MaybeUninit};
73use std::ops::DerefMut;
74use std::pin::{Pin, pin};
75use std::ptr::{self, NonNull};
76use std::slice;
77use std::sync::Mutex;
78use std::task::{Context, Poll, Waker};
79use std::vec::Vec;
80use table::{Table, TableDebug, TableError, TableId};
81use wasmtime_environ::PrimaryMap;
82use wasmtime_environ::component::{
83    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
84    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
85    RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
86    TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
87    TypeTupleIndex,
88};
89
90pub use abort::AbortHandle;
91pub use futures_and_streams::{
92    ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter,
93    GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer,
94    WriteBuffer,
95};
96pub(crate) use futures_and_streams::{
97    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
98};
99
100mod abort;
101mod error_contexts;
102mod futures_and_streams;
103mod states;
104mod table;
105pub(crate) mod tls;
106
107/// Constant defined in the Component Model spec to indicate that the async
108/// intrinsic (e.g. `future.write`) has not yet completed.
109const BLOCKED: u32 = 0xffff_ffff;
110
111/// Corresponds to `CallState` in the upstream spec.
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
113pub enum Status {
114    Starting = 0,
115    Started = 1,
116    Returned = 2,
117    StartCancelled = 3,
118    ReturnCancelled = 4,
119}
120
121impl Status {
122    /// Packs this status and the optional `waitable` provided into a 32-bit
123    /// result that the canonical ABI requires.
124    ///
125    /// The low 4 bits are reserved for the status while the upper 28 bits are
126    /// the waitable, if present.
127    pub fn pack(self, waitable: Option<u32>) -> u32 {
128        assert!(matches!(self, Status::Returned) == waitable.is_none());
129        let waitable = waitable.unwrap_or(0);
130        assert!(waitable < (1 << 28));
131        (waitable << 4) | (self as u32)
132    }
133}
134
135/// Corresponds to `EventCode` in the Component Model spec, plus related payload
136/// data.
137#[derive(Clone, Copy, Debug)]
138enum Event {
139    None,
140    Cancelled,
141    Subtask {
142        status: Status,
143    },
144    StreamRead {
145        code: ReturnCode,
146        pending: Option<(TypeStreamTableIndex, u32)>,
147    },
148    StreamWrite {
149        code: ReturnCode,
150        pending: Option<(TypeStreamTableIndex, u32)>,
151    },
152    FutureRead {
153        code: ReturnCode,
154        pending: Option<(TypeFutureTableIndex, u32)>,
155    },
156    FutureWrite {
157        code: ReturnCode,
158        pending: Option<(TypeFutureTableIndex, u32)>,
159    },
160}
161
162impl Event {
163    /// Lower this event to core Wasm integers for delivery to the guest.
164    ///
165    /// Note that the waitable handle, if any, is assumed to be lowered
166    /// separately.
167    fn parts(self) -> (u32, u32) {
168        const EVENT_NONE: u32 = 0;
169        const EVENT_SUBTASK: u32 = 1;
170        const EVENT_STREAM_READ: u32 = 2;
171        const EVENT_STREAM_WRITE: u32 = 3;
172        const EVENT_FUTURE_READ: u32 = 4;
173        const EVENT_FUTURE_WRITE: u32 = 5;
174        const EVENT_CANCELLED: u32 = 6;
175        match self {
176            Event::None => (EVENT_NONE, 0),
177            Event::Cancelled => (EVENT_CANCELLED, 0),
178            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
179            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
180            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
181            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
182            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
183        }
184    }
185}
186
187/// Corresponds to `CallbackCode` in the spec.
188mod callback_code {
189    pub const EXIT: u32 = 0;
190    pub const YIELD: u32 = 1;
191    pub const WAIT: u32 = 2;
192    pub const POLL: u32 = 3;
193}
194
195/// A flag indicating that the callee is an async-lowered export.
196///
197/// This may be passed to the `async-start` intrinsic from a fused adapter.
198const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
199
200/// Provides access to either store data (via the `get` method) or the store
201/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
202/// instance to which the current host task belongs.
203///
204/// See [`Accessor::with`] for details.
205pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
206    accessor: &'a Accessor<T, D>,
207    store: StoreContextMut<'a, T>,
208}
209
210impl<'a, T, D> Access<'a, T, D>
211where
212    D: HasData + ?Sized,
213    T: 'static,
214{
215    /// Get mutable access to the store data.
216    pub fn data_mut(&mut self) -> &mut T {
217        self.store.data_mut()
218    }
219
220    /// Get mutable access to the store data.
221    pub fn get(&mut self) -> D::Data<'_> {
222        let get_data = self.accessor.get_data;
223        get_data(self.data_mut())
224    }
225
226    /// Spawn a background task.
227    ///
228    /// See [`Accessor::spawn`] for details.
229    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
230    where
231        T: 'static,
232    {
233        self.accessor.instance.unwrap().spawn_with_accessor(
234            self.store.as_context_mut(),
235            self.accessor.clone_for_spawn(),
236            task,
237        )
238    }
239
240    /// Retrieve the component instance of the caller.
241    pub fn instance(&self) -> Instance {
242        self.accessor.instance()
243    }
244}
245
246impl<'a, T, D> AsContext for Access<'a, T, D>
247where
248    D: HasData + ?Sized,
249    T: 'static,
250{
251    type Data = T;
252
253    fn as_context(&self) -> StoreContext<'_, T> {
254        self.store.as_context()
255    }
256}
257
258impl<'a, T, D> AsContextMut for Access<'a, T, D>
259where
260    D: HasData + ?Sized,
261    T: 'static,
262{
263    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
264        self.store.as_context_mut()
265    }
266}
267
268/// Provides scoped mutable access to store data in the context of a concurrent
269/// host task future.
270///
271/// This allows multiple host task futures to execute concurrently and access
272/// the store between (but not across) `await` points.
273///
274/// # Rationale
275///
276/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
277/// `D::Data<'_>`. The problem this is solving, however, is that it does not
278/// literally store these values. The basic problem is that when a concurrent
279/// host future is being polled it has access to `&mut T` (and the whole
280/// `Store`) but when it's not being polled it does not have access to these
281/// values. This reflects how the store is only ever polling one future at a
282/// time so the store is effectively being passed between futures.
283///
284/// Rust's `Future` trait, however, has no means of passing a `Store`
285/// temporarily between futures. The [`Context`](std::task::Context) type does
286/// not have the ability to attach arbitrary information to it at this time.
287/// This type, [`Accessor`], is used to bridge this expressivity gap.
288///
289/// The [`Accessor`] type here represents the ability to acquire, temporarily in
290/// a synchronous manner, the current store. The [`Accessor::with`] function
291/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
292/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
293/// not take an `async` closure as its argument, instead it's a synchronous
294/// closure which must complete during on run of `Future::poll`. This reflects
295/// how the store is temporarily made available while a host future is being
296/// polled.
297///
298/// # Implementation
299///
300/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
301/// this type additionally doesn't even have a lifetime parameter. This is
302/// instead a representation of proof of the ability to acquire these while a
303/// future is being polled. Wasmtime will, when it polls a host future,
304/// configure ambient state such that the `Accessor` that a future closes over
305/// will work and be able to access the store.
306///
307/// This has a number of implications for users such as:
308///
309/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
310///   the lifetime of a single future.
311/// * A futures is expected to, however, close over an `Accessor` and keep it
312///   alive probably for the duration of the entire future.
313/// * Different host futures will be given different `Accessor`s, and that's
314///   intentional.
315/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
316///   alleviates some otherwise required bounds to be written down.
317///
318/// # Using `Accessor` in `Drop`
319///
320/// The methods on `Accessor` are only expected to work in the context of
321/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
322/// host future can be dropped at any time throughout the system and Wasmtime
323/// store context is not necessarily available at that time. It's recommended to
324/// not use `Accessor` methods in anything connected to a `Drop` implementation
325/// as they will panic and have unintended results. If you run into this though
326/// feel free to file an issue on the Wasmtime repository.
327pub struct Accessor<T: 'static, D = HasSelf<T>>
328where
329    D: HasData + ?Sized,
330{
331    token: StoreToken<T>,
332    get_data: fn(&mut T) -> D::Data<'_>,
333    instance: Option<Instance>,
334}
335
336/// A helper trait to take any type of accessor-with-data in functions.
337///
338/// This trait is similar to [`AsContextMut`] except that it's used when
339/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
340/// [`Accessor`] is the main type used in concurrent settings and is passed to
341/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
342///
343/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
344/// this trait. This effectively means that regardless of the `D` in
345/// `Accessor<T, D>` it can still be passed to a function which just needs a
346/// store accessor.
347///
348/// Acquiring an [`Accessor`] can be done through [`Instance::run_concurrent`]
349/// for example or in a host function through
350/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
351pub trait AsAccessor {
352    /// The `T` in `Store<T>` that this accessor refers to.
353    type Data: 'static;
354
355    /// The `D` in `Accessor<T, D>`, or the projection out of
356    /// `Self::Data`.
357    type AccessorData: HasData + ?Sized;
358
359    /// Returns the accessor that this is referring to.
360    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
361}
362
363impl<T: AsAccessor + ?Sized> AsAccessor for &T {
364    type Data = T::Data;
365    type AccessorData = T::AccessorData;
366
367    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
368        T::as_accessor(self)
369    }
370}
371
372impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
373    type Data = T;
374    type AccessorData = D;
375
376    fn as_accessor(&self) -> &Accessor<T, D> {
377        self
378    }
379}
380
381// Note that it is intentional at this time that `Accessor` does not actually
382// store `&mut T` or anything similar. This distinctly enables the `Accessor`
383// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
384// that matter). This is used to ergonomically simplify bindings where the
385// majority of the time `Accessor` is closed over in a future which then needs
386// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
387// you already have to write `T: 'static`...) it helps to avoid this.
388//
389// Note as well that `Accessor` doesn't actually store its data at all. Instead
390// it's more of a "proof" of what can be accessed from TLS. API design around
391// `Accessor` and functions like `Linker::func_wrap_concurrent` are
392// intentionally made to ensure that `Accessor` is ideally only used in the
393// context that TLS variables are actually set. For example host functions are
394// given `&Accessor`, not `Accessor`, and this prevents them from persisting
395// the value outside of a future. Within the future the TLS variables are all
396// guaranteed to be set while the future is being polled.
397//
398// Finally though this is not an ironclad guarantee, but nor does it need to be.
399// The TLS APIs are designed to panic or otherwise model usage where they're
400// called recursively or similar. It's hoped that code cannot be constructed to
401// actually hit this at runtime but this is not a safety requirement at this
402// time.
403const _: () = {
404    const fn assert<T: Send + Sync>() {}
405    assert::<Accessor<UnsafeCell<u32>>>();
406};
407
408impl<T> Accessor<T> {
409    /// Creates a new `Accessor` backed by the specified functions.
410    ///
411    /// - `get`: used to retrieve the store
412    ///
413    /// - `get_data`: used to "project" from the store's associated data to
414    /// another type (e.g. a field of that data or a wrapper around it).
415    ///
416    /// - `spawn`: used to queue spawned background tasks to be run later
417    ///
418    /// - `instance`: used to access the `Instance` to which this `Accessor`
419    /// (and the future which closes over it) belongs
420    pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
421        Self {
422            token,
423            get_data: |x| x,
424            instance,
425        }
426    }
427}
428
429impl<T, D> Accessor<T, D>
430where
431    D: HasData + ?Sized,
432{
433    /// Run the specified closure, passing it mutable access to the store.
434    ///
435    /// This function is one of the main building blocks of the [`Accessor`]
436    /// type. This yields synchronous, blocking, access to store via an
437    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
438    /// providing the ability to access `D` via [`Access::get`]. Note that the
439    /// `fun` here is given only temporary access to the store and `T`/`D`
440    /// meaning that the return value `R` here is not allowed to capture borrows
441    /// into the two. If access is needed to data within `T` or `D` outside of
442    /// this closure then it must be `clone`d out, for example.
443    ///
444    /// # Panics
445    ///
446    /// This function will panic if it is call recursively with any other
447    /// accessor already in scope. For example if `with` is called within `fun`,
448    /// then this function will panic. It is up to the embedder to ensure that
449    /// this does not happen.
450    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
451        tls::get(|vmstore| {
452            fun(Access {
453                store: self.token.as_context_mut(vmstore),
454                accessor: self,
455            })
456        })
457    }
458
459    /// Changes this accessor to access `D2` instead of the current type
460    /// parameter `D`.
461    ///
462    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
463    ///
464    /// Note that this is not a public or recommended API because it's easy to
465    /// cause panics with this by having two `Accessor` values live at the same
466    /// time. The returned `Accessor` does not refer to this `Accessor` meaning
467    /// that both can be used. You could, for example, call `Accessor::with`
468    /// simultaneously on both. That would cause a panic though.
469    ///
470    /// In short while there's nothing unsafe about this it's a footgun. It's
471    /// here for bindings generation where the provided accessor is transformed
472    /// into a new accessor and then this returned accessor is passed to
473    /// implementations.
474    ///
475    /// Note that one possible fix for this would be a lifetime parameter on
476    /// `Accessor` itself so the returned value could borrow from the original
477    /// value (or this could be `self`-by-value instead of `&mut self`) but in
478    /// attempting that it was found to be a bit too onerous in terms of
479    /// plumbing things around without a whole lot of benefit.
480    ///
481    /// In short, this works, but must be treated with care. The current main
482    /// user, bindings generation, treats this with care.
483    #[doc(hidden)]
484    pub fn with_data<D2: HasData>(&self, get_data: fn(&mut T) -> D2::Data<'_>) -> Accessor<T, D2> {
485        Accessor {
486            token: self.token,
487            get_data,
488            instance: self.instance,
489        }
490    }
491
492    /// Spawn a background task which will receive an `&Accessor<T, D>` and
493    /// run concurrently with any other tasks in progress for the current
494    /// instance.
495    ///
496    /// This is particularly useful for host functions which return a `stream`
497    /// or `future` such that the code to write to the write end of that
498    /// `stream` or `future` must run after the function returns.
499    ///
500    /// The returned [`AbortHandle`] may be used to cancel the task.
501    ///
502    /// # Panics
503    ///
504    /// Panics if called within a closure provided to the [`Accessor::with`]
505    /// function. This can only be called outside an active invocation of
506    /// [`Accessor::with`].
507    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> AbortHandle
508    where
509        T: 'static,
510    {
511        let instance = self.instance.unwrap();
512        let accessor = self.clone_for_spawn();
513        self.with(|mut access| {
514            instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
515        })
516    }
517
518    /// Retrieve the component instance of the caller.
519    pub fn instance(&self) -> Instance {
520        self.instance.unwrap()
521    }
522
523    fn clone_for_spawn(&self) -> Self {
524        Self {
525            token: self.token,
526            get_data: self.get_data,
527            instance: self.instance,
528        }
529    }
530}
531
532/// Represents a task which may be provided to `Accessor::spawn`,
533/// `Accessor::forward`, or `Instance::spawn`.
534// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
535// option.
536//
537// `AsyncFnOnce` is still nightly-only in latest stable Rust version as of this
538// writing (1.84.1), and even with 1.85.0-beta it's not possible to specify
539// e.g. `Send` and `Sync` bounds on the `Future` type returned by an
540// `AsyncFnOnce`.  Also, using `F: Future<Output = Result<()>> + Send + Sync,
541// FN: FnOnce(&Accessor<T>) -> F + Send + Sync + 'static` fails with a type
542// mismatch error when we try to pass it an async closure (e.g. `async move |_|
543// { ... }`).  So this seems to be the best we can do for the time being.
544pub trait AccessorTask<T, D, R>: Send + 'static
545where
546    D: HasData + ?Sized,
547{
548    /// Run the task.
549    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
550}
551
552/// Represents the state of a waitable handle.
553#[derive(Debug)]
554enum WaitableState {
555    /// Represents a host task handle.
556    HostTask,
557    /// Represents a guest task handle.
558    GuestTask,
559    /// Represents a stream handle.
560    Stream(TypeStreamTableIndex, StreamFutureState),
561    /// Represents a future handle.
562    Future(TypeFutureTableIndex, StreamFutureState),
563    /// Represents a waitable-set handle.
564    Set,
565}
566
567/// Represents parameter and result metadata for the caller side of a
568/// guest->guest call orchestrated by a fused adapter.
569enum CallerInfo {
570    /// Metadata for a call to an async-lowered import
571    Async {
572        params: Vec<ValRaw>,
573        has_result: bool,
574    },
575    /// Metadata for a call to an sync-lowered import
576    Sync {
577        params: Vec<ValRaw>,
578        result_count: u32,
579    },
580}
581
582/// Indicates how a guest task is waiting on a waitable set.
583enum WaitMode {
584    /// The guest task is waiting using `task.wait`
585    Fiber(StoreFiber<'static>),
586    /// The guest task is waiting via a callback declared as part of an
587    /// async-lifted export.
588    Callback(RuntimeComponentInstanceIndex),
589}
590
591/// Represents the reason a fiber is suspending itself.
592#[derive(Debug)]
593enum SuspendReason {
594    /// The fiber is waiting for an event to be delivered to the specified
595    /// waitable set or task.
596    Waiting {
597        set: TableId<WaitableSet>,
598        task: TableId<GuestTask>,
599    },
600    /// The fiber has finished handling its most recent work item and is waiting
601    /// for another (or to be dropped if it is no longer needed).
602    NeedWork,
603    /// The fiber is yielding and should be resumed once other tasks have had a
604    /// chance to run.
605    Yielding { task: TableId<GuestTask> },
606}
607
608/// Represents a pending call into guest code for a given guest task.
609enum GuestCallKind {
610    /// Indicates there's an event to deliver to the task, possibly related to a
611    /// waitable set the task has been waiting on or polling.
612    DeliverEvent {
613        /// The (sub-)component instance in which the task has most recently
614        /// been executing.
615        ///
616        /// Note that this might not be the same as the instance the guest task
617        /// started executing in given that one or more synchronous guest->guest
618        /// calls may have occurred involving multiple instances.
619        instance: RuntimeComponentInstanceIndex,
620        /// The waitable set the event belongs to, if any.
621        ///
622        /// If this is `None` the event will be waiting in the
623        /// `GuestTask::event` field for the task.
624        set: Option<TableId<WaitableSet>>,
625    },
626    /// Indicates that a new guest task call is pending and may be executed
627    /// using the specified closure.
628    Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
629}
630
631impl fmt::Debug for GuestCallKind {
632    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
633        match self {
634            Self::DeliverEvent { instance, set } => f
635                .debug_struct("DeliverEvent")
636                .field("instance", instance)
637                .field("set", set)
638                .finish(),
639            Self::Start(_) => f.debug_tuple("Start").finish(),
640        }
641    }
642}
643
644/// Represents a pending call into guest code for a given guest task.
645#[derive(Debug)]
646struct GuestCall {
647    task: TableId<GuestTask>,
648    kind: GuestCallKind,
649}
650
651impl GuestCall {
652    /// Returns whether or not the call is ready to run.
653    ///
654    /// A call will not be ready to run if either:
655    ///
656    /// - the (sub-)component instance to be called has already been entered and
657    /// cannot be reentered until an in-progress call completes
658    ///
659    /// - the call is for a not-yet started task and the (sub-)component
660    /// instance to be called has backpressure enabled
661    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
662        let task_instance = state.get(self.task)?.instance;
663        let state = state.instance_state(task_instance);
664        let ready = match &self.kind {
665            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
666            GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure),
667        };
668        log::trace!(
669            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
670            state.do_not_enter,
671            state.backpressure
672        );
673        Ok(ready)
674    }
675}
676
677/// Job to be run on a worker fiber.
678enum WorkerItem {
679    GuestCall(GuestCall),
680    Function(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
681}
682
683/// Represents state related to an in-progress poll operation (e.g. `task.poll`
684/// or `CallbackCode.POLL`).
685#[derive(Debug)]
686struct PollParams {
687    /// Identifies the polling task.
688    task: TableId<GuestTask>,
689    /// The waitable set being polled.
690    set: TableId<WaitableSet>,
691    /// The (sub-)component instance in which the task has most recently been
692    /// executing.
693    ///
694    /// Note that this might not be the same as the instance the guest task
695    /// started executing in given that one or more synchronous guest->guest
696    /// calls may have occurred involving multiple instances.
697    instance: RuntimeComponentInstanceIndex,
698}
699
700/// Represents a pending work item to be handled by the event loop for a given
701/// component instance.
702enum WorkItem {
703    /// A host task to be pushed to `ConcurrentState::futures`.
704    PushFuture(Mutex<HostTaskFuture>),
705    /// A fiber to resume.
706    ResumeFiber(StoreFiber<'static>),
707    /// A pending call into guest code for a given guest task.
708    GuestCall(GuestCall),
709    /// A pending `task.poll` or `CallbackCode.POLL` operation.
710    Poll(PollParams),
711    /// A job to run on a worker fiber.
712    WorkerFunction(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
713}
714
715impl fmt::Debug for WorkItem {
716    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
717        match self {
718            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
719            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
720            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
721            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
722            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
723        }
724    }
725}
726
727impl ConcurrentState {
728    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
729        self.instance_states.entry(instance).or_default()
730    }
731
732    fn push<V: Send + Sync + 'static>(&mut self, value: V) -> Result<TableId<V>, TableError> {
733        self.table.push(value)
734    }
735
736    fn get<V: 'static>(&self, id: TableId<V>) -> Result<&V, TableError> {
737        self.table.get(id)
738    }
739
740    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, TableError> {
741        self.table.get_mut(id)
742    }
743
744    pub fn add_child<T, U>(
745        &mut self,
746        child: TableId<T>,
747        parent: TableId<U>,
748    ) -> Result<(), TableError> {
749        self.table.add_child(child, parent)
750    }
751
752    pub fn remove_child<T, U>(
753        &mut self,
754        child: TableId<T>,
755        parent: TableId<U>,
756    ) -> Result<(), TableError> {
757        self.table.remove_child(child, parent)
758    }
759
760    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, TableError> {
761        self.table.delete(id)
762    }
763
764    fn push_future(&mut self, future: HostTaskFuture) {
765        // Note that we can't directly push to `ConcurrentState::futures` here
766        // since this may be called from a future that's being polled inside
767        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
768        // so it has exclusive access while polling it.  Therefore, we push a
769        // work item to the "high priority" queue, which will actually push to
770        // `ConcurrentState::futures` later.
771        self.push_high_priority(WorkItem::PushFuture(Mutex::new(future)));
772    }
773
774    fn push_high_priority(&mut self, item: WorkItem) {
775        log::trace!("push high priority: {item:?}");
776        self.high_priority.push(item);
777    }
778
779    fn push_low_priority(&mut self, item: WorkItem) {
780        log::trace!("push low priority: {item:?}");
781        self.low_priority.push(item);
782    }
783
784    /// Determine whether the instance associated with the specified guest task
785    /// may be entered (i.e. is not already on the async call stack).
786    ///
787    /// This is an additional check on top of the "may_enter" instance flag;
788    /// it's needed because async-lifted exports with callback functions must
789    /// not call their own instances directly or indirectly, and due to the
790    /// "stackless" nature of callback-enabled guest tasks this may happen even
791    /// if there are no activation records on the stack (i.e. the "may_enter"
792    /// field is `true`) for that instance.
793    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
794        let guest_instance = self.get(guest_task).unwrap().instance;
795
796        // Walk the task tree back to the root, looking for potential
797        // reentrance.
798        //
799        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
800        // such that each bit represents and instance which has been entered by
801        // that task or an ancestor of that task, in which case this would be a
802        // constant time check.
803        loop {
804            match &self.get_mut(guest_task).unwrap().caller {
805                Caller::Host { .. } => break true,
806                Caller::Guest { task, instance } => {
807                    if *instance == guest_instance {
808                        break false;
809                    } else {
810                        guest_task = *task;
811                    }
812                }
813            }
814        }
815    }
816
817    /// Handle the `CallbackCode` returned from an async-lifted export or its
818    /// callback.
819    ///
820    /// If `initial_call` is `true`, then the code was received from the
821    /// async-lifted export; otherwise, it was received from its callback.
822    fn handle_callback_code(
823        &mut self,
824        guest_task: TableId<GuestTask>,
825        runtime_instance: RuntimeComponentInstanceIndex,
826        code: u32,
827        initial_call: bool,
828    ) -> Result<()> {
829        let (code, set) = unpack_callback_code(code);
830
831        log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
832
833        let task = self.get_mut(guest_task)?;
834
835        if task.lift_result.is_some() {
836            if code == callback_code::EXIT {
837                return Err(anyhow!(crate::Trap::NoAsyncResult));
838            }
839            if initial_call {
840                // Notify any current or future waiters that this subtask has
841                // started.
842                Waitable::Guest(guest_task).set_event(
843                    self,
844                    Some(Event::Subtask {
845                        status: Status::Started,
846                    }),
847                )?;
848            }
849        }
850
851        let get_set = |instance: &mut Self, handle| {
852            if handle == 0 {
853                bail!("invalid waitable-set handle");
854            }
855
856            let (set, WaitableState::Set) =
857                instance.waitable_tables[runtime_instance].get_mut_by_index(handle)?
858            else {
859                bail!("invalid waitable-set handle");
860            };
861
862            Ok(TableId::<WaitableSet>::new(set))
863        };
864
865        match code {
866            callback_code::EXIT => {
867                let task = self.get_mut(guest_task)?;
868                match &task.caller {
869                    Caller::Host {
870                        remove_task_automatically,
871                        ..
872                    } => {
873                        if *remove_task_automatically {
874                            log::trace!("handle_callback_code will delete task {guest_task:?}");
875                            Waitable::Guest(guest_task).delete_from(self)?;
876                        }
877                    }
878                    Caller::Guest { .. } => {
879                        task.exited = true;
880                        task.callback = None;
881                    }
882                }
883            }
884            callback_code::YIELD => {
885                // Push this task onto the "low priority" queue so it runs after
886                // any other tasks have had a chance to run.
887                let task = self.get_mut(guest_task)?;
888                assert!(task.event.is_none());
889                task.event = Some(Event::None);
890                self.push_low_priority(WorkItem::GuestCall(GuestCall {
891                    task: guest_task,
892                    kind: GuestCallKind::DeliverEvent {
893                        instance: runtime_instance,
894                        set: None,
895                    },
896                }));
897            }
898            callback_code::WAIT | callback_code::POLL => {
899                let set = get_set(self, set)?;
900
901                if self.get_mut(guest_task)?.event.is_some() || !self.get_mut(set)?.ready.is_empty()
902                {
903                    // An event is immediately available; deliver it ASAP.
904                    self.push_high_priority(WorkItem::GuestCall(GuestCall {
905                        task: guest_task,
906                        kind: GuestCallKind::DeliverEvent {
907                            instance: runtime_instance,
908                            set: Some(set),
909                        },
910                    }));
911                } else {
912                    // No event is immediately available.
913                    match code {
914                        callback_code::POLL => {
915                            // We're polling, so just yield and check whether an
916                            // event has arrived after that.
917                            self.push_low_priority(WorkItem::Poll(PollParams {
918                                task: guest_task,
919                                instance: runtime_instance,
920                                set,
921                            }));
922                        }
923                        callback_code::WAIT => {
924                            // We're waiting, so register to be woken up when an
925                            // event is published for this waitable set.
926                            //
927                            // Here we also set `GuestTask::wake_on_cancel`
928                            // which allows `subtask.cancel` to interrupt the
929                            // wait.
930                            let old = self.get_mut(guest_task)?.wake_on_cancel.replace(set);
931                            assert!(old.is_none());
932                            let old = self
933                                .get_mut(set)?
934                                .waiting
935                                .insert(guest_task, WaitMode::Callback(runtime_instance));
936                            assert!(old.is_none());
937                        }
938                        _ => unreachable!(),
939                    }
940                }
941            }
942            _ => bail!("unsupported callback code: {code}"),
943        }
944
945        Ok(())
946    }
947
948    /// Record that we're about to enter a (sub-)component instance which does
949    /// not support more than one concurrent, stackful activation, meaning it
950    /// cannot be entered again until the next call returns.
951    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
952        self.instance_state(instance).do_not_enter = true;
953    }
954
955    /// Record that we've exited a (sub-)component instance previously entered
956    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
957    /// See the documentation for the latter for details.
958    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
959        self.instance_state(instance).do_not_enter = false;
960        self.partition_pending(instance)
961    }
962
963    /// Iterate over `InstanceState::pending`, moving any ready items into the
964    /// "high priority" work item queue.
965    ///
966    /// See `GuestCall::is_ready` for details.
967    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
968        for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
969            let call = GuestCall { task, kind };
970            if call.is_ready(self)? {
971                self.push_high_priority(WorkItem::GuestCall(call));
972            } else {
973                self.instance_state(instance)
974                    .pending
975                    .insert(call.task, call.kind);
976            }
977        }
978
979        Ok(())
980    }
981
982    /// Get the next pending event for the specified task and (optional)
983    /// waitable set, along with the waitable handle if applicable.
984    fn get_event(
985        &mut self,
986        guest_task: TableId<GuestTask>,
987        instance: RuntimeComponentInstanceIndex,
988        set: Option<TableId<WaitableSet>>,
989    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
990        Ok(
991            if let Some(event) = self.get_mut(guest_task)?.event.take() {
992                log::trace!("deliver event {event:?} to {guest_task:?}");
993
994                Some((event, None))
995            } else if let Some((set, waitable)) = set
996                .and_then(|set| {
997                    self.get_mut(set)
998                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
999                        .transpose()
1000                })
1001                .transpose()?
1002            {
1003                let event = waitable.common(self)?.event.take().unwrap();
1004
1005                log::trace!(
1006                    "deliver event {event:?} to {guest_task:?} for {waitable:?}; set {set:?}"
1007                );
1008
1009                let entry = self.waitable_tables[instance].get_mut_by_rep(waitable.rep());
1010                let Some((
1011                    handle,
1012                    WaitableState::HostTask
1013                    | WaitableState::GuestTask
1014                    | WaitableState::Stream(..)
1015                    | WaitableState::Future(..),
1016                )) = entry
1017                else {
1018                    bail!("handle not found for waitable rep {waitable:?} instance {instance:?}");
1019                };
1020
1021                waitable.on_delivery(self, event);
1022
1023                Some((event, Some((waitable, handle))))
1024            } else {
1025                None
1026            },
1027        )
1028    }
1029
1030    /// Implements the `backpressure.set` intrinsic.
1031    pub(crate) fn backpressure_set(
1032        &mut self,
1033        caller_instance: RuntimeComponentInstanceIndex,
1034        enabled: u32,
1035    ) -> Result<()> {
1036        let state = self.instance_state(caller_instance);
1037        let old = state.backpressure;
1038        let new = enabled != 0;
1039        state.backpressure = new;
1040
1041        if old && !new {
1042            // Backpressure was previously enabled and is now disabled; move any
1043            // newly-eligible guest calls to the "high priority" queue.
1044            self.partition_pending(caller_instance)?;
1045        }
1046
1047        Ok(())
1048    }
1049
1050    /// Implements the `waitable-set.new` intrinsic.
1051    pub(crate) fn waitable_set_new(
1052        &mut self,
1053        caller_instance: RuntimeComponentInstanceIndex,
1054    ) -> Result<u32> {
1055        let set = self.push(WaitableSet::default())?;
1056        let handle = self.waitable_tables[caller_instance].insert(set.rep(), WaitableState::Set)?;
1057        log::trace!("new waitable set {set:?} (handle {handle})");
1058        Ok(handle)
1059    }
1060
1061    /// Implements the `waitable-set.drop` intrinsic.
1062    pub(crate) fn waitable_set_drop(
1063        &mut self,
1064        caller_instance: RuntimeComponentInstanceIndex,
1065        set: u32,
1066    ) -> Result<()> {
1067        let (rep, WaitableState::Set) =
1068            self.waitable_tables[caller_instance].remove_by_index(set)?
1069        else {
1070            bail!("invalid waitable-set handle");
1071        };
1072
1073        log::trace!("drop waitable set {rep} (handle {set})");
1074
1075        let set = self.delete(TableId::<WaitableSet>::new(rep))?;
1076
1077        if !set.waiting.is_empty() {
1078            bail!("cannot drop waitable set with waiters");
1079        }
1080
1081        Ok(())
1082    }
1083
1084    /// Implements the `waitable.join` intrinsic.
1085    pub(crate) fn waitable_join(
1086        &mut self,
1087        caller_instance: RuntimeComponentInstanceIndex,
1088        waitable_handle: u32,
1089        set_handle: u32,
1090    ) -> Result<()> {
1091        let waitable = Waitable::from_instance(self, caller_instance, waitable_handle)?;
1092
1093        let set = if set_handle == 0 {
1094            None
1095        } else {
1096            let (set, WaitableState::Set) =
1097                self.waitable_tables[caller_instance].get_mut_by_index(set_handle)?
1098            else {
1099                bail!("invalid waitable-set handle");
1100            };
1101
1102            Some(TableId::<WaitableSet>::new(set))
1103        };
1104
1105        log::trace!(
1106            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
1107        );
1108
1109        waitable.join(self, set)
1110    }
1111
1112    /// Implements the `subtask.drop` intrinsic.
1113    pub(crate) fn subtask_drop(
1114        &mut self,
1115        caller_instance: RuntimeComponentInstanceIndex,
1116        task_id: u32,
1117    ) -> Result<()> {
1118        self.waitable_join(caller_instance, task_id, 0)?;
1119
1120        let (rep, state) = self.waitable_tables[caller_instance].remove_by_index(task_id)?;
1121
1122        let (waitable, expected_caller_instance, delete) = match state {
1123            WaitableState::HostTask => {
1124                let id = TableId::<HostTask>::new(rep);
1125                let task = self.get(id)?;
1126                if task.abort_handle.is_some() {
1127                    bail!("cannot drop a subtask which has not yet resolved");
1128                }
1129                (Waitable::Host(id), task.caller_instance, true)
1130            }
1131            WaitableState::GuestTask => {
1132                let id = TableId::<GuestTask>::new(rep);
1133                let task = self.get(id)?;
1134                if task.lift_result.is_some() {
1135                    bail!("cannot drop a subtask which has not yet resolved");
1136                }
1137                if let Caller::Guest { instance, .. } = &task.caller {
1138                    (Waitable::Guest(id), *instance, task.exited)
1139                } else {
1140                    unreachable!()
1141                }
1142            }
1143            _ => bail!("invalid task handle: {task_id}"),
1144        };
1145
1146        if waitable.take_event(self)?.is_some() {
1147            bail!("cannot drop a subtask with an undelivered event");
1148        }
1149
1150        if delete {
1151            waitable.delete_from(self)?;
1152        }
1153
1154        // Since waitables can neither be passed between instances nor forged,
1155        // this should never fail unless there's a bug in Wasmtime, but we check
1156        // here to be sure:
1157        assert_eq!(expected_caller_instance, caller_instance);
1158        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
1159        Ok(())
1160    }
1161
1162    /// Implements the `context.get` intrinsic.
1163    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
1164        let task = self.guest_task.unwrap();
1165        let val = self.get(task)?.context[usize::try_from(slot).unwrap()];
1166        log::trace!("context_get {task:?} slot {slot} val {val:#x}");
1167        Ok(val)
1168    }
1169
1170    /// Implements the `context.set` intrinsic.
1171    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
1172        let task = self.guest_task.unwrap();
1173        log::trace!("context_set {task:?} slot {slot} val {val:#x}");
1174        self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
1175        Ok(())
1176    }
1177
1178    fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
1179        &self.component.env_component().options[options]
1180    }
1181}
1182
1183impl Instance {
1184    /// Enable or disable concurrent state debugging mode for e.g. integration
1185    /// tests.
1186    ///
1187    /// This will avoid re-using deleted handles, making it easier to catch
1188    /// e.g. "use-after-delete" and "double-delete" errors.  It can also make
1189    /// reading trace output easier since it ensures handles are never
1190    /// repurposed.
1191    #[doc(hidden)]
1192    pub fn enable_concurrent_state_debug(&self, mut store: impl AsContextMut, enable: bool) {
1193        self.id()
1194            .get_mut(store.as_context_mut().0)
1195            .concurrent_state_mut()
1196            .table
1197            .enable_debug(enable);
1198        // TODO: do the same for the tables holding guest-facing handles
1199    }
1200
1201    /// Assert that all the relevant tables and queues in the concurrent state
1202    /// for this instance are empty.
1203    ///
1204    /// This is for sanity checking in integration tests
1205    /// (e.g. `component-async-tests`) that the relevant state has been cleared
1206    /// after each test concludes.  This should help us catch leaks, e.g. guest
1207    /// tasks which haven't been deleted despite having completed and having
1208    /// been dropped by their supertasks.
1209    #[doc(hidden)]
1210    pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
1211        let state = self
1212            .id()
1213            .get_mut(store.as_context_mut().0)
1214            .concurrent_state_mut();
1215        assert!(state.table.is_empty(), "non-empty table: {:?}", state.table);
1216        assert!(state.high_priority.is_empty());
1217        assert!(state.low_priority.is_empty());
1218        assert!(state.guest_task.is_none());
1219        assert!(
1220            state
1221                .futures
1222                .get_mut()
1223                .unwrap()
1224                .as_ref()
1225                .unwrap()
1226                .is_empty()
1227        );
1228        assert!(
1229            state
1230                .waitable_tables
1231                .iter()
1232                .all(|(_, table)| table.is_empty())
1233        );
1234        assert!(
1235            state
1236                .instance_states
1237                .iter()
1238                .all(|(_, state)| state.pending.is_empty())
1239        );
1240        assert!(
1241            state
1242                .error_context_tables
1243                .iter()
1244                .all(|(_, table)| table.is_empty())
1245        );
1246        assert!(state.global_error_context_ref_counts.is_empty());
1247    }
1248
1249    /// Run the specified closure `fun` to completion as part of this instance's
1250    /// event loop.
1251    ///
1252    /// Like [`Self::run`], this will run `fun` as part of this instance's event
1253    /// loop until it yields a result _or_ there are no more tasks to run.
1254    /// Unlike [`Self::run`], `fun` is provided an [`Accessor`], which provides
1255    /// controlled access to the `Store` and its data.
1256    ///
1257    /// This function can be used to invoke [`Func::call_concurrent`] for
1258    /// example within the async closure provided here.
1259    ///
1260    /// # Example
1261    ///
1262    /// ```
1263    /// # use {
1264    /// #   anyhow::{Result},
1265    /// #   wasmtime::{
1266    /// #     component::{ Component, Linker, Resource, ResourceTable},
1267    /// #     Config, Engine, Store
1268    /// #   },
1269    /// # };
1270    /// #
1271    /// # struct MyResource(u32);
1272    /// # struct Ctx { table: ResourceTable }
1273    /// #
1274    /// # async fn foo() -> Result<()> {
1275    /// # let mut config = Config::new();
1276    /// # let engine = Engine::new(&config)?;
1277    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1278    /// # let mut linker = Linker::new(&engine);
1279    /// # let component = Component::new(&engine, "")?;
1280    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1281    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1282    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1283    /// instance.run_concurrent(&mut store, async |accessor| -> wasmtime::Result<_> {
1284    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1285    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1286    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1287    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1288    ///    Ok(())
1289    /// }).await??;
1290    /// # Ok(())
1291    /// # }
1292    /// ```
1293    pub async fn run_concurrent<T, R>(
1294        self,
1295        mut store: impl AsContextMut<Data = T>,
1296        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1297    ) -> Result<R>
1298    where
1299        T: 'static,
1300    {
1301        check_recursive_run();
1302        let mut store = store.as_context_mut();
1303        let token = StoreToken::new(store.as_context_mut());
1304
1305        struct Dropper<'a, T: 'static, V> {
1306            store: StoreContextMut<'a, T>,
1307            value: ManuallyDrop<V>,
1308        }
1309
1310        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1311            fn drop(&mut self) {
1312                tls::set(self.store.0.traitobj_mut(), || {
1313                    // SAFETY: Here we drop the value without moving it for the
1314                    // first and only time -- per the contract for `Drop::drop`,
1315                    // this code won't run again, and the `value` field will no
1316                    // longer be accessible.
1317                    unsafe { ManuallyDrop::drop(&mut self.value) }
1318                });
1319            }
1320        }
1321
1322        let accessor = &Accessor::new(token, Some(self));
1323        let dropper = &mut Dropper {
1324            store,
1325            value: ManuallyDrop::new(fun(accessor)),
1326        };
1327        // SAFETY: We never move `dropper` nor its `value` field.
1328        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1329
1330        self.poll_until(dropper.store.as_context_mut(), future)
1331            .await
1332    }
1333
1334    /// Spawn a background task to run as part of this instance's event loop.
1335    ///
1336    /// The task will receive an `&Accessor<U>` and run concurrently with
1337    /// any other tasks in progress for the instance.
1338    ///
1339    /// Note that the task will only make progress if and when the event loop
1340    /// for this instance is run.
1341    ///
1342    /// The returned [`SpawnHandle`] may be used to cancel the task.
1343    pub fn spawn<U: 'static>(
1344        self,
1345        mut store: impl AsContextMut<Data = U>,
1346        task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1347    ) -> AbortHandle {
1348        let mut store = store.as_context_mut();
1349        let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
1350        self.spawn_with_accessor(store, accessor, task)
1351    }
1352
1353    /// Internal implementation of `spawn` functions where a `store` is
1354    /// available along with an `Accessor`.
1355    fn spawn_with_accessor<T, D>(
1356        self,
1357        mut store: StoreContextMut<T>,
1358        accessor: Accessor<T, D>,
1359        task: impl AccessorTask<T, D, Result<()>>,
1360    ) -> AbortHandle
1361    where
1362        T: 'static,
1363        D: HasData + ?Sized,
1364    {
1365        let store = store.as_context_mut();
1366
1367        // Create an "abortable future" here where internally the future will
1368        // hook calls to poll and possibly spawn more background tasks on each
1369        // iteration.
1370        let (handle, future) =
1371            AbortHandle::run(async move { HostTaskOutput::Result(task.run(&accessor).await) });
1372        self.concurrent_state_mut(store.0)
1373            .push_future(Box::pin(async move {
1374                future.await.unwrap_or(HostTaskOutput::Result(Ok(())))
1375            }));
1376
1377        handle
1378    }
1379
1380    /// Run this instance's event loop.
1381    ///
1382    /// The returned future will resolve when either the specified future
1383    /// completes (in which case we return its result) or no further progress
1384    /// can be made (in which case we trap with `Trap::AsyncDeadlock`).
1385    async fn poll_until<T, R>(
1386        self,
1387        store: StoreContextMut<'_, T>,
1388        mut future: Pin<&mut impl Future<Output = R>>,
1389    ) -> Result<R> {
1390        loop {
1391            // Take `ConcurrentState::futures` out of the instance so we can
1392            // poll it while also safely giving any of the futures inside access
1393            // to `self`.
1394            let mut futures = self
1395                .concurrent_state_mut(store.0)
1396                .futures
1397                .get_mut()
1398                .unwrap()
1399                .take()
1400                .unwrap();
1401            let mut next = pin!(futures.next());
1402
1403            let result = future::poll_fn(|cx| {
1404                // First, poll the future we were passed as an argument and
1405                // return immediately if it's ready.
1406                if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1407                    return Poll::Ready(Ok(Either::Left(value)));
1408                }
1409
1410                // Next, poll `ConcurrentState::futures` (which includes any
1411                // pending host tasks and/or background tasks), returning
1412                // immediately if one of them fails.
1413                let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1414                    Poll::Ready(Some(output)) => {
1415                        match output {
1416                            HostTaskOutput::Result(Err(e)) => return Poll::Ready(Err(e)),
1417                            HostTaskOutput::Result(Ok(())) => {}
1418                            HostTaskOutput::Function(fun) => {
1419                                // Defer calling this function to a worker fiber
1420                                // in case it involves calling a guest realloc
1421                                // function as part of a lowering operation.
1422                                //
1423                                // TODO: This isn't necessary for _all_
1424                                // `HostOutput::Function`s, so we could optimize
1425                                // by adding another variant to `HostOutput` to
1426                                // distinguish which ones need it and which
1427                                // don't.
1428                                self.concurrent_state_mut(store.0)
1429                                    .push_high_priority(WorkItem::WorkerFunction(Mutex::new(fun)))
1430                            }
1431                        }
1432                        Poll::Ready(true)
1433                    }
1434                    Poll::Ready(None) => Poll::Ready(false),
1435                    Poll::Pending => Poll::Pending,
1436                };
1437
1438                let mut instance = self.id().get_mut(store.0);
1439
1440                // Next, check the "high priority" work queue and return
1441                // immediately if it has at least one item.
1442                let state = instance.as_mut().concurrent_state_mut();
1443                let ready = mem::take(&mut state.high_priority);
1444                let ready = if ready.is_empty() {
1445                    // Next, check the "low priority" work queue and return
1446                    // immediately if it has at least one item.
1447                    let ready = mem::take(&mut state.low_priority);
1448                    if ready.is_empty() {
1449                        return match next {
1450                            Poll::Ready(true) => {
1451                                // In this case, one of the futures in
1452                                // `ConcurrentState::futures` completed
1453                                // successfully, so we return now and continue
1454                                // the outer loop in case there is another one
1455                                // ready to complete.
1456                                Poll::Ready(Ok(Either::Right(Vec::new())))
1457                            }
1458                            Poll::Ready(false) => {
1459                                // Poll the future we were passed one last time
1460                                // in case one of `ConcurrentState::futures` had
1461                                // the side effect of unblocking it.
1462                                if let Poll::Ready(value) =
1463                                    self.set_tls(store.0, || future.as_mut().poll(cx))
1464                                {
1465                                    Poll::Ready(Ok(Either::Left(value)))
1466                                } else {
1467                                    // In this case, there are no more pending
1468                                    // futures in `ConcurrentState::futures`,
1469                                    // there are no remaining work items, _and_
1470                                    // the future we were passed as an argument
1471                                    // still hasn't completed, meaning we're
1472                                    // stuck, so we return an error.  The
1473                                    // underlying assumption is that `future`
1474                                    // depends on this component instance making
1475                                    // such progress, and thus there's no point
1476                                    // in continuing to poll it given we've run
1477                                    // out of work to do.
1478                                    //
1479                                    // Note that we'd also reach this point if
1480                                    // the host embedder passed e.g. a
1481                                    // `std::future::Pending` to
1482                                    // `Instance::run_concurrent`, in which case
1483                                    // we'd return a "deadlock" error even when
1484                                    // any and all tasks have completed
1485                                    // normally.  However, that's not how
1486                                    // `Instance::run_concurrent` is intended
1487                                    // (and documented) to be used, so it seems
1488                                    // reasonable to lump that case in with
1489                                    // "real" deadlocks.
1490                                    //
1491                                    // TODO: Once we've added host APIs for
1492                                    // cancelling in-progress tasks, we can
1493                                    // return some other, non-error value here,
1494                                    // treating it as "normal" and giving the
1495                                    // host embedder a chance to intervene by
1496                                    // cancelling one or more tasks and/or
1497                                    // starting new tasks capable of waking the
1498                                    // existing ones.
1499                                    Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1500                                }
1501                            }
1502                            // There is at least one pending future in
1503                            // `ConcurrentState::futures` and we have nothing
1504                            // else to do but wait for now, so we return
1505                            // `Pending`.
1506                            Poll::Pending => Poll::Pending,
1507                        };
1508                    } else {
1509                        ready
1510                    }
1511                } else {
1512                    ready
1513                };
1514
1515                Poll::Ready(Ok(Either::Right(ready)))
1516            })
1517            .await;
1518
1519            // Put the `ConcurrentState::futures` back into the instance before
1520            // we return or handle any work items since one or more of those
1521            // items might append more futures.
1522            *self
1523                .concurrent_state_mut(store.0)
1524                .futures
1525                .get_mut()
1526                .unwrap() = Some(futures);
1527
1528            match result? {
1529                // The future we were passed as an argument completed, so we
1530                // return the result.
1531                Either::Left(value) => break Ok(value),
1532                // The future we were passed has not yet completed, so handle
1533                // any work items and then loop again.
1534                Either::Right(ready) => {
1535                    for item in ready {
1536                        self.handle_work_item(store.0.traitobj_mut(), item).await?;
1537                    }
1538                }
1539            }
1540        }
1541    }
1542
1543    /// Handle the specified work item, possibly resuming a fiber if applicable.
1544    async fn handle_work_item(self, store: &mut StoreOpaque, item: WorkItem) -> Result<()> {
1545        log::trace!("handle work item {item:?}");
1546        match item {
1547            WorkItem::PushFuture(future) => {
1548                self.concurrent_state_mut(store)
1549                    .futures
1550                    .get_mut()
1551                    .unwrap()
1552                    .as_mut()
1553                    .unwrap()
1554                    .push(future.into_inner().unwrap());
1555            }
1556            WorkItem::ResumeFiber(fiber) => {
1557                self.resume_fiber(store, fiber).await?;
1558            }
1559            WorkItem::GuestCall(call) => {
1560                let state = self.concurrent_state_mut(store);
1561                if call.is_ready(state)? {
1562                    self.run_on_worker(store, WorkerItem::GuestCall(call))
1563                        .await?;
1564                } else {
1565                    let task = state.get_mut(call.task)?;
1566                    if !task.starting_sent {
1567                        task.starting_sent = true;
1568                        if let GuestCallKind::Start(_) = &call.kind {
1569                            Waitable::Guest(call.task).set_event(
1570                                state,
1571                                Some(Event::Subtask {
1572                                    status: Status::Starting,
1573                                }),
1574                            )?;
1575                        }
1576                    }
1577
1578                    let runtime_instance = state.get(call.task)?.instance;
1579                    state
1580                        .instance_state(runtime_instance)
1581                        .pending
1582                        .insert(call.task, call.kind);
1583                }
1584            }
1585            WorkItem::Poll(params) => {
1586                let state = self.concurrent_state_mut(store);
1587                if state.get_mut(params.task)?.event.is_some()
1588                    || !state.get_mut(params.set)?.ready.is_empty()
1589                {
1590                    // There's at least one event immediately available; deliver
1591                    // it to the guest ASAP.
1592                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1593                        task: params.task,
1594                        kind: GuestCallKind::DeliverEvent {
1595                            instance: params.instance,
1596                            set: Some(params.set),
1597                        },
1598                    }));
1599                } else {
1600                    // There are no events immediately available; deliver
1601                    // `Event::None` to the guest.
1602                    state.get_mut(params.task)?.event = Some(Event::None);
1603                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1604                        task: params.task,
1605                        kind: GuestCallKind::DeliverEvent {
1606                            instance: params.instance,
1607                            set: Some(params.set),
1608                        },
1609                    }));
1610                }
1611            }
1612            WorkItem::WorkerFunction(fun) => {
1613                self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1614            }
1615        }
1616
1617        Ok(())
1618    }
1619
1620    /// Resume the specified fiber, giving it exclusive access to the specified
1621    /// store.
1622    async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
1623        let old_task = self.concurrent_state_mut(store).guest_task;
1624        log::trace!("resume_fiber: save current task {old_task:?}");
1625
1626        let fiber = fiber::resolve_or_release(store, fiber).await?;
1627
1628        let state = self.concurrent_state_mut(store);
1629
1630        state.guest_task = old_task;
1631        log::trace!("resume_fiber: restore current task {old_task:?}");
1632
1633        if let Some(mut fiber) = fiber {
1634            // See the `SuspendReason` documentation for what each case means.
1635            match state.suspend_reason.take().unwrap() {
1636                SuspendReason::NeedWork => {
1637                    if state.worker.is_none() {
1638                        state.worker = Some(fiber);
1639                    } else {
1640                        fiber.dispose(store);
1641                    }
1642                }
1643                SuspendReason::Yielding { .. } => {
1644                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1645                }
1646                SuspendReason::Waiting { set, task } => {
1647                    let old = state
1648                        .get_mut(set)?
1649                        .waiting
1650                        .insert(task, WaitMode::Fiber(fiber));
1651                    assert!(old.is_none());
1652                }
1653            }
1654        }
1655
1656        Ok(())
1657    }
1658
1659    /// Execute the specified guest call on a worker fiber.
1660    async fn run_on_worker(self, store: &mut StoreOpaque, item: WorkerItem) -> Result<()> {
1661        let worker = if let Some(fiber) = self.concurrent_state_mut(store).worker.take() {
1662            fiber
1663        } else {
1664            fiber::make_fiber(store.traitobj_mut(), move |store| {
1665                loop {
1666                    match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1667                        WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1668                        WorkerItem::Function(fun) => fun.into_inner().unwrap()(store, self)?,
1669                    }
1670
1671                    self.suspend(store, SuspendReason::NeedWork)?;
1672                }
1673            })?
1674        };
1675
1676        let worker_item = &mut self.concurrent_state_mut(store).worker_item;
1677        assert!(worker_item.is_none());
1678        *worker_item = Some(item);
1679
1680        self.resume_fiber(store, worker).await
1681    }
1682
1683    /// Execute the specified guest call.
1684    fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
1685        match call.kind {
1686            GuestCallKind::DeliverEvent {
1687                instance: runtime_instance,
1688                set,
1689            } => {
1690                let state = self.concurrent_state_mut(store);
1691                let (event, waitable) = state.get_event(call.task, runtime_instance, set)?.unwrap();
1692                let task = state.get_mut(call.task)?;
1693                let runtime_instance = task.instance;
1694                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
1695
1696                log::trace!(
1697                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
1698                    call.task,
1699                );
1700
1701                let old_task = state.guest_task.replace(call.task);
1702                log::trace!(
1703                    "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
1704                    call.task
1705                );
1706
1707                self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
1708
1709                let state = self.concurrent_state_mut(store);
1710                state.enter_instance(runtime_instance);
1711
1712                let callback = state.get_mut(call.task)?.callback.take().unwrap();
1713
1714                let code = callback(store, self, runtime_instance, event, handle)?;
1715
1716                let state = self.concurrent_state_mut(store);
1717
1718                state.get_mut(call.task)?.callback = Some(callback);
1719
1720                state.exit_instance(runtime_instance)?;
1721
1722                self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
1723
1724                let state = self.concurrent_state_mut(store);
1725                state.handle_callback_code(call.task, runtime_instance, code, false)?;
1726
1727                state.guest_task = old_task;
1728                log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
1729            }
1730            GuestCallKind::Start(fun) => {
1731                fun(store, self)?;
1732            }
1733        }
1734
1735        Ok(())
1736    }
1737
1738    /// Suspend the current fiber, storing the reason in
1739    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1740    /// it should be resumed.
1741    ///
1742    /// See the `SuspendReason` documentation for details.
1743    fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
1744        log::trace!("suspend fiber: {reason:?}");
1745
1746        // If we're yielding or waiting on behalf of a guest task, we'll need to
1747        // pop the call context which manages resource borrows before suspending
1748        // and then push it again once we've resumed.
1749        let task = match &reason {
1750            SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1751            SuspendReason::NeedWork => None,
1752        };
1753
1754        let old_guest_task = if let Some(task) = task {
1755            self.maybe_pop_call_context(store.store_opaque_mut(), task)?;
1756            self.concurrent_state_mut(store).guest_task
1757        } else {
1758            None
1759        };
1760
1761        let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
1762        assert!(suspend_reason.is_none());
1763        *suspend_reason = Some(reason);
1764
1765        store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1766
1767        if let Some(task) = task {
1768            self.concurrent_state_mut(store).guest_task = old_guest_task;
1769            self.maybe_push_call_context(store.store_opaque_mut(), task)?;
1770        }
1771
1772        Ok(())
1773    }
1774
1775    /// Push the call context for managing resource borrows for the specified
1776    /// guest task if it has not yet either returned a result or cancelled
1777    /// itself.
1778    fn maybe_push_call_context(
1779        self,
1780        store: &mut StoreOpaque,
1781        guest_task: TableId<GuestTask>,
1782    ) -> Result<()> {
1783        let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
1784        if task.lift_result.is_some() {
1785            log::trace!("push call context for {guest_task:?}");
1786            let call_context = task.call_context.take().unwrap();
1787            store.component_resource_state().0.push(call_context);
1788        }
1789        Ok(())
1790    }
1791
1792    /// Pop the call context for managing resource borrows for the specified
1793    /// guest task if it has not yet either returned a result or cancelled
1794    /// itself.
1795    fn maybe_pop_call_context(
1796        self,
1797        store: &mut StoreOpaque,
1798        guest_task: TableId<GuestTask>,
1799    ) -> Result<()> {
1800        if self
1801            .concurrent_state_mut(store)
1802            .get(guest_task)?
1803            .lift_result
1804            .is_some()
1805        {
1806            log::trace!("pop call context for {guest_task:?}");
1807            let call_context = Some(store.component_resource_state().0.pop().unwrap());
1808            self.concurrent_state_mut(store)
1809                .get_mut(guest_task)?
1810                .call_context = call_context;
1811        }
1812        Ok(())
1813    }
1814
1815    /// Add the specified guest call to the "high priority" work item queue, to
1816    /// be started as soon as backpressure and/or reentrance rules allow.
1817    ///
1818    /// SAFETY: The raw pointer arguments must be valid references to guest
1819    /// functions (with the appropriate signatures) when the closures queued by
1820    /// this function are called.
1821    unsafe fn queue_call<T: 'static>(
1822        self,
1823        mut store: StoreContextMut<T>,
1824        guest_task: TableId<GuestTask>,
1825        callee: SendSyncPtr<VMFuncRef>,
1826        param_count: usize,
1827        result_count: usize,
1828        flags: Option<InstanceFlags>,
1829        async_: bool,
1830        callback: Option<SendSyncPtr<VMFuncRef>>,
1831        post_return: Option<SendSyncPtr<VMFuncRef>>,
1832    ) -> Result<()> {
1833        /// Return a closure which will call the specified function in the scope
1834        /// of the specified task.
1835        ///
1836        /// This will use `GuestTask::lower_params` to lower the parameters, but
1837        /// will not lift the result; instead, it returns a
1838        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1839        /// any, may be lifted.  Note that an async-lifted export will have
1840        /// returned its result using the `task.return` intrinsic (or not
1841        /// returned a result at all, in the case of `task.cancel`), in which
1842        /// case the "result" of this call will either be a callback code or
1843        /// nothing.
1844        ///
1845        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1846        /// the returned closure is called.
1847        unsafe fn make_call<T: 'static>(
1848            store: StoreContextMut<T>,
1849            guest_task: TableId<GuestTask>,
1850            callee: SendSyncPtr<VMFuncRef>,
1851            param_count: usize,
1852            result_count: usize,
1853            flags: Option<InstanceFlags>,
1854        ) -> impl FnOnce(
1855            &mut dyn VMStore,
1856            Instance,
1857        ) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1858        + Send
1859        + Sync
1860        + 'static
1861        + use<T> {
1862            let token = StoreToken::new(store);
1863            move |store: &mut dyn VMStore, instance: Instance| {
1864                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1865                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1866                let may_enter_after_call = task.call_post_return_automatically();
1867                let lower = task.lower_params.take().unwrap();
1868
1869                lower(store, instance, &mut storage[..param_count])?;
1870
1871                let mut store = token.as_context_mut(store);
1872
1873                // SAFETY: Per the contract documented in `make_call's`
1874                // documentation, `callee` must be a valid pointer.
1875                unsafe {
1876                    if let Some(mut flags) = flags {
1877                        flags.set_may_enter(false);
1878                    }
1879                    crate::Func::call_unchecked_raw(
1880                        &mut store,
1881                        callee.as_non_null(),
1882                        NonNull::new(
1883                            &mut storage[..param_count.max(result_count)]
1884                                as *mut [MaybeUninit<ValRaw>] as _,
1885                        )
1886                        .unwrap(),
1887                    )?;
1888                    if let Some(mut flags) = flags {
1889                        flags.set_may_enter(may_enter_after_call);
1890                    }
1891                }
1892
1893                Ok(storage)
1894            }
1895        }
1896
1897        // SAFETY: Per the contract described in this function documentation,
1898        // the `callee` pointer which `call` closes over must be valid when
1899        // called by the closure we queue below.
1900        let call = unsafe {
1901            make_call(
1902                store.as_context_mut(),
1903                guest_task,
1904                callee,
1905                param_count,
1906                result_count,
1907                flags,
1908            )
1909        };
1910
1911        let callee_instance = self.concurrent_state_mut(store.0).get(guest_task)?.instance;
1912        let fun = if callback.is_some() {
1913            assert!(async_);
1914
1915            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1916                let old_task = instance
1917                    .concurrent_state_mut(store)
1918                    .guest_task
1919                    .replace(guest_task);
1920                log::trace!(
1921                    "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1922                );
1923
1924                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1925
1926                instance
1927                    .concurrent_state_mut(store)
1928                    .enter_instance(callee_instance);
1929
1930                // SAFETY: See the documentation for `make_call` to review the
1931                // contract we must uphold for `call` here.
1932                //
1933                // Per the contract described in the `queue_call`
1934                // documentation, the `callee` pointer which `call` closes
1935                // over must be valid.
1936                let storage = call(store, instance)?;
1937
1938                instance
1939                    .concurrent_state_mut(store)
1940                    .exit_instance(callee_instance)?;
1941
1942                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1943
1944                let state = instance.concurrent_state_mut(store);
1945                state.guest_task = old_task;
1946                log::trace!("stackless call: restored {old_task:?} as current task");
1947
1948                // SAFETY: `wasmparser` will have validated that the callback
1949                // function returns a `i32` result.
1950                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1951
1952                state.handle_callback_code(guest_task, callee_instance, code, true)?;
1953
1954                Ok(())
1955            })
1956                as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
1957        } else {
1958            let token = StoreToken::new(store.as_context_mut());
1959            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1960                let old_task = instance
1961                    .concurrent_state_mut(store)
1962                    .guest_task
1963                    .replace(guest_task);
1964                log::trace!(
1965                    "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1966                );
1967
1968                let mut flags = instance.id().get(store).instance_flags(callee_instance);
1969
1970                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1971
1972                // Unless this is a callback-less (i.e. stackful)
1973                // async-lifted export, we need to record that the instance
1974                // cannot be entered until the call returns.
1975                if !async_ {
1976                    instance
1977                        .concurrent_state_mut(store)
1978                        .enter_instance(callee_instance);
1979                }
1980
1981                // SAFETY: See the documentation for `make_call` to review the
1982                // contract we must uphold for `call` here.
1983                //
1984                // Per the contract described in the `queue_call`
1985                // documentation, the `callee` pointer which `call` closes
1986                // over must be valid.
1987                let storage = call(store, instance)?;
1988
1989                if async_ {
1990                    // This is a callback-less (i.e. stackful) async-lifted
1991                    // export, so there is no post-return function, and
1992                    // either `task.return` or `task.cancel` should have
1993                    // been called.
1994                    if instance
1995                        .concurrent_state_mut(store)
1996                        .get(guest_task)?
1997                        .lift_result
1998                        .is_some()
1999                    {
2000                        return Err(anyhow!(crate::Trap::NoAsyncResult));
2001                    }
2002                } else {
2003                    // This is a sync-lifted export, so now is when we lift the
2004                    // result, optionally call the post-return function, if any,
2005                    // and finally notify any current or future waiters that the
2006                    // subtask has returned.
2007
2008                    let lift = {
2009                        let state = instance.concurrent_state_mut(store);
2010                        state.exit_instance(callee_instance)?;
2011
2012                        assert!(state.get(guest_task)?.result.is_none());
2013
2014                        state.get_mut(guest_task)?.lift_result.take().unwrap()
2015                    };
2016
2017                    // SAFETY: `result_count` represents the number of core Wasm
2018                    // results returned, per `wasmparser`.
2019                    let result = (lift.lift)(store, instance, unsafe {
2020                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2021                            &storage[..result_count],
2022                        )
2023                    })?;
2024
2025                    let post_return_arg = match result_count {
2026                        0 => ValRaw::i32(0),
2027                        // SAFETY: `result_count` represents the number of
2028                        // core Wasm results returned, per `wasmparser`.
2029                        1 => unsafe { storage[0].assume_init() },
2030                        _ => unreachable!(),
2031                    };
2032
2033                    if instance
2034                        .concurrent_state_mut(store)
2035                        .get(guest_task)?
2036                        .call_post_return_automatically()
2037                    {
2038                        unsafe { flags.set_needs_post_return(false) }
2039
2040                        if let Some(func) = post_return {
2041                            let mut store = token.as_context_mut(store);
2042
2043                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
2044                            // either `wasmtime-cranelift`-generated fused adapter
2045                            // code or `component::Options`.  Per `wasmparser`
2046                            // post-return signature validation, we know it takes a
2047                            // single parameter.
2048                            unsafe {
2049                                crate::Func::call_unchecked_raw(
2050                                    &mut store,
2051                                    func.as_non_null(),
2052                                    slice::from_ref(&post_return_arg).into(),
2053                                )?;
2054                            }
2055                        }
2056
2057                        unsafe { flags.set_may_enter(true) }
2058                    }
2059
2060                    instance.task_complete(
2061                        store,
2062                        guest_task,
2063                        result,
2064                        Status::Returned,
2065                        post_return_arg,
2066                    )?;
2067                }
2068
2069                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
2070
2071                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
2072
2073                match &task.caller {
2074                    Caller::Host {
2075                        remove_task_automatically,
2076                        ..
2077                    } => {
2078                        if *remove_task_automatically {
2079                            Waitable::Guest(guest_task)
2080                                .delete_from(instance.concurrent_state_mut(store))?;
2081                        }
2082                    }
2083                    Caller::Guest { .. } => {
2084                        task.exited = true;
2085                    }
2086                }
2087
2088                Ok(())
2089            })
2090        };
2091
2092        self.concurrent_state_mut(store.0)
2093            .push_high_priority(WorkItem::GuestCall(GuestCall {
2094                task: guest_task,
2095                kind: GuestCallKind::Start(fun),
2096            }));
2097
2098        Ok(())
2099    }
2100
2101    /// Prepare (but do not start) a guest->guest call.
2102    ///
2103    /// This is called from fused adapter code generated in
2104    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2105    /// are synthesized Wasm functions which move the parameters from the caller
2106    /// to the callee and the result from the callee to the caller,
2107    /// respectively.  The adapter will call `Self::start_call` immediately
2108    /// after calling this function.
2109    ///
2110    /// SAFETY: All the pointer arguments must be valid pointers to guest
2111    /// entities (and with the expected signatures for the function references
2112    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2113    unsafe fn prepare_call<T: 'static>(
2114        self,
2115        mut store: StoreContextMut<T>,
2116        start: *mut VMFuncRef,
2117        return_: *mut VMFuncRef,
2118        caller_instance: RuntimeComponentInstanceIndex,
2119        callee_instance: RuntimeComponentInstanceIndex,
2120        task_return_type: TypeTupleIndex,
2121        memory: *mut VMMemoryDefinition,
2122        string_encoding: u8,
2123        caller_info: CallerInfo,
2124    ) -> Result<()> {
2125        enum ResultInfo {
2126            Heap { results: u32 },
2127            Stack { result_count: u32 },
2128        }
2129
2130        let result_info = match &caller_info {
2131            CallerInfo::Async {
2132                has_result: true,
2133                params,
2134            } => ResultInfo::Heap {
2135                results: params.last().unwrap().get_u32(),
2136            },
2137            CallerInfo::Async {
2138                has_result: false, ..
2139            } => ResultInfo::Stack { result_count: 0 },
2140            CallerInfo::Sync {
2141                result_count,
2142                params,
2143            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2144                results: params.last().unwrap().get_u32(),
2145            },
2146            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2147                result_count: *result_count,
2148            },
2149        };
2150
2151        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2152
2153        // Create a new guest task for the call, closing over the `start` and
2154        // `return_` functions to lift the parameters and lower the result,
2155        // respectively.
2156        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2157        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2158        let token = StoreToken::new(store.as_context_mut());
2159        let state = self.concurrent_state_mut(store.0);
2160        let old_task = state.guest_task.take();
2161        let new_task = GuestTask::new(
2162            state,
2163            Box::new(move |store, instance, dst| {
2164                let mut store = token.as_context_mut(store);
2165                assert!(dst.len() <= MAX_FLAT_PARAMS);
2166                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2167                let count = match caller_info {
2168                    // Async callers, if they have a result, use the last
2169                    // parameter as a return pointer so chop that off if
2170                    // relevant here.
2171                    CallerInfo::Async { params, has_result } => {
2172                        let params = &params[..params.len() - usize::from(has_result)];
2173                        for (param, src) in params.iter().zip(&mut src) {
2174                            src.write(*param);
2175                        }
2176                        params.len()
2177                    }
2178
2179                    // Sync callers forward everything directly.
2180                    CallerInfo::Sync { params, .. } => {
2181                        for (param, src) in params.iter().zip(&mut src) {
2182                            src.write(*param);
2183                        }
2184                        params.len()
2185                    }
2186                };
2187                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2188                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2189                // how it was constructed (see
2190                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2191                // for details) we know it takes count parameters and returns
2192                // `dst.len()` results.
2193                unsafe {
2194                    crate::Func::call_unchecked_raw(
2195                        &mut store,
2196                        start.as_non_null(),
2197                        NonNull::new(
2198                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2199                        )
2200                        .unwrap(),
2201                    )?;
2202                }
2203                dst.copy_from_slice(&src[..dst.len()]);
2204                let state = instance.concurrent_state_mut(store.0);
2205                let task = state.guest_task.unwrap();
2206                Waitable::Guest(task).set_event(
2207                    state,
2208                    Some(Event::Subtask {
2209                        status: Status::Started,
2210                    }),
2211                )?;
2212                Ok(())
2213            }),
2214            LiftResult {
2215                lift: Box::new(move |store, instance, src| {
2216                    // SAFETY: See comment in closure passed as `lower_params`
2217                    // parameter above.
2218                    let mut store = token.as_context_mut(store);
2219                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2220                    if let ResultInfo::Heap { results } = &result_info {
2221                        my_src.push(ValRaw::u32(*results));
2222                    }
2223                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2224                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2225                    // on how it was constructed (see
2226                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2227                    // for details) we know it takes `src.len()` parameters and
2228                    // returns up to 1 result.
2229                    unsafe {
2230                        crate::Func::call_unchecked_raw(
2231                            &mut store,
2232                            return_.as_non_null(),
2233                            my_src.as_mut_slice().into(),
2234                        )?;
2235                    }
2236                    let state = instance.concurrent_state_mut(store.0);
2237                    let task = state.guest_task.unwrap();
2238                    if sync_caller {
2239                        state.get_mut(task)?.sync_result =
2240                            Some(if let ResultInfo::Stack { result_count } = &result_info {
2241                                match result_count {
2242                                    0 => None,
2243                                    1 => Some(my_src[0]),
2244                                    _ => unreachable!(),
2245                                }
2246                            } else {
2247                                None
2248                            });
2249                    }
2250                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2251                }),
2252                ty: task_return_type,
2253                memory: NonNull::new(memory).map(SendSyncPtr::new),
2254                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2255            },
2256            Caller::Guest {
2257                task: old_task.unwrap(),
2258                instance: caller_instance,
2259            },
2260            None,
2261            callee_instance,
2262        )?;
2263
2264        let guest_task = state.push(new_task)?;
2265
2266        if let Some(old_task) = old_task {
2267            if !state.may_enter(guest_task) {
2268                bail!(crate::Trap::CannotEnterComponent);
2269            }
2270
2271            state.get_mut(old_task)?.subtasks.insert(guest_task);
2272        };
2273
2274        // Make the new task the current one so that `Self::start_call` knows
2275        // which one to start.
2276        state.guest_task = Some(guest_task);
2277        log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2278
2279        Ok(())
2280    }
2281
2282    /// Call the specified callback function for an async-lifted export.
2283    ///
2284    /// SAFETY: `function` must be a valid reference to a guest function of the
2285    /// correct signature for a callback.
2286    unsafe fn call_callback<T>(
2287        self,
2288        mut store: StoreContextMut<T>,
2289        callee_instance: RuntimeComponentInstanceIndex,
2290        function: SendSyncPtr<VMFuncRef>,
2291        event: Event,
2292        handle: u32,
2293        may_enter_after_call: bool,
2294    ) -> Result<u32> {
2295        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2296
2297        let (ordinal, result) = event.parts();
2298        let params = &mut [
2299            ValRaw::u32(ordinal),
2300            ValRaw::u32(handle),
2301            ValRaw::u32(result),
2302        ];
2303        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2304        // `wasmtime-cranelift`-generated fused adapter code or
2305        // `component::Options`.  Per `wasmparser` callback signature
2306        // validation, we know it takes three parameters and returns one.
2307        unsafe {
2308            flags.set_may_enter(false);
2309            crate::Func::call_unchecked_raw(
2310                &mut store,
2311                function.as_non_null(),
2312                params.as_mut_slice().into(),
2313            )?;
2314            flags.set_may_enter(may_enter_after_call);
2315        }
2316        Ok(params[0].get_u32())
2317    }
2318
2319    /// Start a guest->guest call previously prepared using
2320    /// `Self::prepare_call`.
2321    ///
2322    /// This is called from fused adapter code generated in
2323    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2324    /// this function immediately after calling `Self::prepare_call`.
2325    ///
2326    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2327    /// functions with the appropriate signatures for the current guest task.
2328    /// If this is a call to an async-lowered import, the actual call may be
2329    /// deferred and run after this function returns, in which case the pointer
2330    /// arguments must also be valid when the call happens.
2331    unsafe fn start_call<T: 'static>(
2332        self,
2333        mut store: StoreContextMut<T>,
2334        callback: *mut VMFuncRef,
2335        post_return: *mut VMFuncRef,
2336        callee: *mut VMFuncRef,
2337        param_count: u32,
2338        result_count: u32,
2339        flags: u32,
2340        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2341    ) -> Result<u32> {
2342        let token = StoreToken::new(store.as_context_mut());
2343        let async_caller = storage.is_none();
2344        let state = self.concurrent_state_mut(store.0);
2345        let guest_task = state.guest_task.unwrap();
2346        let may_enter_after_call = state.get(guest_task)?.call_post_return_automatically();
2347        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2348        let param_count = usize::try_from(param_count).unwrap();
2349        assert!(param_count <= MAX_FLAT_PARAMS);
2350        let result_count = usize::try_from(result_count).unwrap();
2351        assert!(result_count <= MAX_FLAT_RESULTS);
2352
2353        let task = state.get_mut(guest_task)?;
2354        if !callback.is_null() {
2355            // We're calling an async-lifted export with a callback, so store
2356            // the callback and related context as part of the task so we can
2357            // call it later when needed.
2358            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2359            task.callback = Some(Box::new(
2360                move |store, instance, runtime_instance, event, handle| {
2361                    let store = token.as_context_mut(store);
2362                    unsafe {
2363                        instance.call_callback::<T>(
2364                            store,
2365                            runtime_instance,
2366                            callback,
2367                            event,
2368                            handle,
2369                            may_enter_after_call,
2370                        )
2371                    }
2372                },
2373            ));
2374        }
2375
2376        let Caller::Guest {
2377            task: caller,
2378            instance: runtime_instance,
2379        } = &task.caller
2380        else {
2381            // As of this writing, `start_call` is only used for guest->guest
2382            // calls.
2383            unreachable!()
2384        };
2385        let caller = *caller;
2386        let caller_instance = *runtime_instance;
2387
2388        let callee_instance = task.instance;
2389
2390        let instance_flags = if callback.is_null() {
2391            None
2392        } else {
2393            Some(self.id().get(store.0).instance_flags(callee_instance))
2394        };
2395
2396        // Queue the call as a "high priority" work item.
2397        unsafe {
2398            self.queue_call(
2399                store.as_context_mut(),
2400                guest_task,
2401                callee,
2402                param_count,
2403                result_count,
2404                instance_flags,
2405                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2406                NonNull::new(callback).map(SendSyncPtr::new),
2407                NonNull::new(post_return).map(SendSyncPtr::new),
2408            )?;
2409        }
2410
2411        let state = self.concurrent_state_mut(store.0);
2412
2413        // Use the caller's `GuestTask::sync_call_set` to register interest in
2414        // the subtask...
2415        let guest_waitable = Waitable::Guest(guest_task);
2416        let old_set = guest_waitable.common(state)?.set;
2417        let set = state.get_mut(caller)?.sync_call_set;
2418        guest_waitable.join(state, Some(set))?;
2419
2420        // ... and suspend this fiber temporarily while we wait for it to start.
2421        //
2422        // Note that we _could_ call the callee directly using the current fiber
2423        // rather than suspend this one, but that would make reasoning about the
2424        // event loop more complicated and is probably only worth doing if
2425        // there's a measurable performance benefit.  In addition, it would mean
2426        // blocking the caller if the callee calls a blocking sync-lowered
2427        // import, and as of this writing the spec says we must not do that.
2428        //
2429        // Alternatively, the fused adapter code could be modified to call the
2430        // callee directly without calling a host-provided intrinsic at all (in
2431        // which case it would need to do its own, inline backpressure checks,
2432        // etc.).  Again, we'd want to see a measurable performance benefit
2433        // before committing to such an optimization.  And again, we'd need to
2434        // update the spec to allow that.
2435        let (status, waitable) = loop {
2436            self.suspend(
2437                store.0.traitobj_mut(),
2438                SuspendReason::Waiting { set, task: caller },
2439            )?;
2440
2441            let state = self.concurrent_state_mut(store.0);
2442
2443            let event = Waitable::Guest(guest_task).take_event(state)?;
2444            let Some(Event::Subtask { status }) = event else {
2445                unreachable!();
2446            };
2447
2448            log::trace!("status {status:?} for {guest_task:?}");
2449
2450            if status == Status::Returned {
2451                // It returned, so we can stop waiting.
2452                break (status, None);
2453            } else if async_caller {
2454                // It hasn't returned yet, but the caller is calling via an
2455                // async-lowered import, so we generate a handle for the task
2456                // waitable and return the status.
2457                break (
2458                    status,
2459                    Some(
2460                        state.waitable_tables[caller_instance]
2461                            .insert(guest_task.rep(), WaitableState::GuestTask)?,
2462                    ),
2463                );
2464            } else {
2465                // The callee hasn't returned yet, and the caller is calling via
2466                // a sync-lowered import, so we loop and keep waiting until the
2467                // callee returns.
2468            }
2469        };
2470
2471        let state = self.concurrent_state_mut(store.0);
2472
2473        guest_waitable.join(state, old_set)?;
2474
2475        if let Some(storage) = storage {
2476            // The caller used a sync-lowered import to call an async-lifted
2477            // export, in which case the result, if any, has been stashed in
2478            // `GuestTask::sync_result`.
2479            if let Some(result) = state.get_mut(guest_task)?.sync_result.take() {
2480                if let Some(result) = result {
2481                    storage[0] = MaybeUninit::new(result);
2482                }
2483
2484                Waitable::Guest(guest_task).delete_from(state)?;
2485            } else {
2486                // This means the callee failed to call either `task.return` or
2487                // `task.cancel` before exiting.
2488                return Err(anyhow!(crate::Trap::NoAsyncResult));
2489            }
2490        }
2491
2492        // Reset the current task to point to the caller as it resumes control.
2493        state.guest_task = Some(caller);
2494        log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2495
2496        Ok(status.pack(waitable))
2497    }
2498
2499    /// Wrap the specified host function in a future which will call it, passing
2500    /// it an `&Accessor<T>`.
2501    ///
2502    /// See the `Accessor` documentation for details.
2503    pub(crate) fn wrap_call<T: 'static, F, R>(
2504        self,
2505        store: StoreContextMut<T>,
2506        closure: F,
2507    ) -> impl Future<Output = Result<R>> + 'static
2508    where
2509        T: 'static,
2510        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
2511            + Send
2512            + Sync
2513            + 'static,
2514        R: Send + Sync + 'static,
2515    {
2516        let token = StoreToken::new(store);
2517        async move {
2518            let mut accessor = Accessor::new(token, Some(self));
2519            closure(&mut accessor).await
2520        }
2521    }
2522
2523    /// Poll the specified future once on behalf of a guest->host call using an
2524    /// async-lowered import.
2525    ///
2526    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2527    /// `Pending`, add it to the set of futures to be polled as part of this
2528    /// instance's event loop until it completes, and then return
2529    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2530    ///
2531    /// Whether the future returns `Ready` immediately or later, the `lower`
2532    /// function will be used to lower the result, if any, into the guest caller's
2533    /// stack and linear memory unless the task has been cancelled.
2534    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2535        self,
2536        mut store: StoreContextMut<T>,
2537        future: impl Future<Output = Result<R>> + Send + 'static,
2538        caller_instance: RuntimeComponentInstanceIndex,
2539        lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
2540    ) -> Result<Option<u32>> {
2541        let token = StoreToken::new(store.as_context_mut());
2542        let state = self.concurrent_state_mut(store.0);
2543        let caller = state.guest_task.unwrap();
2544
2545        // Create an abortable future which hooks calls to poll and manages call
2546        // context state for the future.
2547        let (abort_handle, future) = AbortHandle::run(async move {
2548            let mut future = pin!(future);
2549            let mut call_context = None;
2550            future::poll_fn(move |cx| {
2551                // Push the call context for managing any resource borrows
2552                // for the task.
2553                tls::get(|store| {
2554                    if let Some(call_context) = call_context.take() {
2555                        token
2556                            .as_context_mut(store)
2557                            .0
2558                            .component_resource_state()
2559                            .0
2560                            .push(call_context);
2561                    }
2562                });
2563
2564                let result = future.as_mut().poll(cx);
2565
2566                if result.is_pending() {
2567                    // Pop the call context for managing any resource
2568                    // borrows for the task.
2569                    tls::get(|store| {
2570                        call_context = Some(
2571                            token
2572                                .as_context_mut(store)
2573                                .0
2574                                .component_resource_state()
2575                                .0
2576                                .pop()
2577                                .unwrap(),
2578                        );
2579                    });
2580                }
2581                result
2582            })
2583            .await
2584        });
2585
2586        // We create a new host task even though it might complete immediately
2587        // (in which case we won't need to pass a waitable back to the guest).
2588        // If it does complete immediately, we'll remove it before we return.
2589        let task = state.push(HostTask::new(caller_instance, Some(abort_handle)))?;
2590
2591        log::trace!("new host task child of {caller:?}: {task:?}");
2592        let token = StoreToken::new(store.as_context_mut());
2593
2594        // Map the output of the future to a `HostTaskOutput` responsible for
2595        // lowering the result into the guest's stack and memory, as well as
2596        // notifying any waiters that the task returned.
2597        let mut future = Box::pin(async move {
2598            let result = match future.await {
2599                Some(result) => result,
2600                // Task was cancelled; nothing left to do.
2601                None => return HostTaskOutput::Result(Ok(())),
2602            };
2603            HostTaskOutput::Function(Box::new(move |store, instance| {
2604                let mut store = token.as_context_mut(store);
2605                lower(store.as_context_mut(), instance, result?)?;
2606                let state = instance.concurrent_state_mut(store.0);
2607                state.get_mut(task)?.abort_handle.take();
2608                Waitable::Host(task).set_event(
2609                    state,
2610                    Some(Event::Subtask {
2611                        status: Status::Returned,
2612                    }),
2613                )?;
2614
2615                Ok(())
2616            }))
2617        });
2618
2619        // Finally, poll the future.  We can use a dummy `Waker` here because
2620        // we'll add the future to `ConcurrentState::futures` and poll it
2621        // automatically from the event loop if it doesn't complete immediately
2622        // here.
2623        let poll = self.set_tls(store.0, || {
2624            future
2625                .as_mut()
2626                .poll(&mut Context::from_waker(&Waker::noop()))
2627        });
2628
2629        Ok(match poll {
2630            Poll::Ready(output) => {
2631                // It finished immediately; lower the result and delete the
2632                // task.
2633                output.consume(store.0.traitobj_mut(), self)?;
2634                log::trace!("delete host task {task:?} (already ready)");
2635                self.concurrent_state_mut(store.0).delete(task)?;
2636                None
2637            }
2638            Poll::Pending => {
2639                // It hasn't finished yet; add the future to
2640                // `ConcurrentState::futures` so it will be polled by the event
2641                // loop and allocate a waitable handle to return to the guest.
2642                let state = self.concurrent_state_mut(store.0);
2643                state.push_future(future);
2644                let handle = state.waitable_tables[caller_instance]
2645                    .insert(task.rep(), WaitableState::HostTask)?;
2646                log::trace!(
2647                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2648                );
2649                Some(handle)
2650            }
2651        })
2652    }
2653
2654    /// Poll the specified future until it completes on behalf of a guest->host
2655    /// call using a sync-lowered import.
2656    ///
2657    /// This is similar to `Self::first_poll` except it's for sync-lowered
2658    /// imports, meaning we don't need to handle cancellation and we can block
2659    /// the caller until the task completes, at which point the caller can
2660    /// handle lowering the result to the guest's stack and linear memory.
2661    pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
2662        self,
2663        store: &mut dyn VMStore,
2664        future: impl Future<Output = Result<R>> + Send + 'static,
2665        caller_instance: RuntimeComponentInstanceIndex,
2666    ) -> Result<R> {
2667        let state = self.concurrent_state_mut(store);
2668
2669        // If there is no current guest task set, that means the host function
2670        // was registered using e.g. `LinkerInstance::func_wrap`, in which case
2671        // it should complete immediately.
2672        let Some(caller) = state.guest_task else {
2673            return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
2674                Poll::Ready(result) => result,
2675                Poll::Pending => {
2676                    unreachable!()
2677                }
2678            };
2679        };
2680
2681        // Save any existing result stashed in `GuestTask::result` so we can
2682        // replace it with the new result.
2683        let old_result = state
2684            .get_mut(caller)
2685            .with_context(|| format!("bad handle: {caller:?}"))?
2686            .result
2687            .take();
2688
2689        // Add a temporary host task into the table so we can track its
2690        // progress.  Note that we'll never allocate a waitable handle for the
2691        // guest since we're being called synchronously.
2692        let task = state.push(HostTask::new(caller_instance, None))?;
2693
2694        log::trace!("new host task child of {caller:?}: {task:?}");
2695
2696        // Map the output of the future to a `HostTaskOutput` which will take
2697        // care of stashing the result in `GuestTask::result` and resuming this
2698        // fiber when the host task completes.
2699        let mut future = Box::pin(future.map(move |result| {
2700            HostTaskOutput::Function(Box::new(move |store, instance| {
2701                let state = instance.concurrent_state_mut(store);
2702                state.get_mut(caller)?.result = Some(Box::new(result?) as _);
2703
2704                Waitable::Host(task).set_event(
2705                    state,
2706                    Some(Event::Subtask {
2707                        status: Status::Returned,
2708                    }),
2709                )?;
2710
2711                Ok(())
2712            }))
2713        })) as HostTaskFuture;
2714
2715        // Finally, poll the future.  We can use a dummy `Waker` here because
2716        // we'll add the future to `ConcurrentState::futures` and poll it
2717        // automatically from the event loop if it doesn't complete immediately
2718        // here.
2719        let poll = self.set_tls(store, || {
2720            future
2721                .as_mut()
2722                .poll(&mut Context::from_waker(&Waker::noop()))
2723        });
2724
2725        match poll {
2726            Poll::Ready(output) => {
2727                // It completed immediately; run the `HostTaskOutput` function
2728                // to stash the result and delete the task.
2729                output.consume(store, self)?;
2730                log::trace!("delete host task {task:?} (already ready)");
2731                self.concurrent_state_mut(store).delete(task)?;
2732            }
2733            Poll::Pending => {
2734                // It did not complete immediately; add it to
2735                // `ConcurrentState::futures` so it will be polled via the event
2736                // loop, then use `GuestTask::sync_call_set` to wait for the
2737                // task to complete, suspending the current fiber until it does
2738                // so.
2739                let state = self.concurrent_state_mut(store);
2740                state.push_future(future);
2741
2742                let set = state.get_mut(caller)?.sync_call_set;
2743                Waitable::Host(task).join(state, Some(set))?;
2744
2745                self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2746            }
2747        }
2748
2749        // Retrieve and return the result.
2750        Ok(*mem::replace(
2751            &mut self.concurrent_state_mut(store).get_mut(caller)?.result,
2752            old_result,
2753        )
2754        .unwrap()
2755        .downcast()
2756        .unwrap())
2757    }
2758
2759    /// Implements the `task.return` intrinsic, lifting the result for the
2760    /// current guest task.
2761    pub(crate) fn task_return(
2762        self,
2763        store: &mut dyn VMStore,
2764        ty: TypeTupleIndex,
2765        options: OptionsIndex,
2766        storage: &[ValRaw],
2767    ) -> Result<()> {
2768        let state = self.concurrent_state_mut(store);
2769        let CanonicalOptions {
2770            string_encoding,
2771            data_model,
2772            ..
2773        } = *state.options(options);
2774        let guest_task = state.guest_task.unwrap();
2775        let lift = state
2776            .get_mut(guest_task)?
2777            .lift_result
2778            .take()
2779            .ok_or_else(|| {
2780                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2781            })?;
2782        assert!(state.get(guest_task)?.result.is_none());
2783
2784        let invalid = ty != lift.ty
2785            || string_encoding != lift.string_encoding
2786            || match data_model {
2787                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2788                    Some(memory) => {
2789                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2790                        let actual = self.id().get(store).runtime_memory(memory);
2791                        expected != actual
2792                    }
2793                    // Memory not specified, meaning it didn't need to be
2794                    // specified per validation, so not invalid.
2795                    None => false,
2796                },
2797                // Always invalid as this isn't supported.
2798                CanonicalOptionsDataModel::Gc { .. } => true,
2799            };
2800
2801        if invalid {
2802            bail!("invalid `task.return` signature and/or options for current task");
2803        }
2804
2805        log::trace!("task.return for {guest_task:?}");
2806
2807        let result = (lift.lift)(store, self, storage)?;
2808
2809        self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2810    }
2811
2812    /// Implements the `task.cancel` intrinsic.
2813    pub(crate) fn task_cancel(
2814        self,
2815        store: &mut dyn VMStore,
2816        _caller_instance: RuntimeComponentInstanceIndex,
2817    ) -> Result<()> {
2818        let state = self.concurrent_state_mut(store);
2819        let guest_task = state.guest_task.unwrap();
2820        let task = state.get_mut(guest_task)?;
2821        if !task.cancel_sent {
2822            bail!("`task.cancel` called by task which has not been cancelled")
2823        }
2824        _ = task.lift_result.take().ok_or_else(|| {
2825            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2826        })?;
2827
2828        assert!(task.result.is_none());
2829
2830        log::trace!("task.cancel for {guest_task:?}");
2831
2832        self.task_complete(
2833            store,
2834            guest_task,
2835            Box::new(DummyResult),
2836            Status::ReturnCancelled,
2837            ValRaw::i32(0),
2838        )
2839    }
2840
2841    /// Complete the specified guest task (i.e. indicate that it has either
2842    /// returned a (possibly empty) result or cancelled itself).
2843    ///
2844    /// This will return any resource borrows and notify any current or future
2845    /// waiters that the task has completed.
2846    fn task_complete(
2847        self,
2848        store: &mut dyn VMStore,
2849        guest_task: TableId<GuestTask>,
2850        result: Box<dyn Any + Send + Sync>,
2851        status: Status,
2852        post_return_arg: ValRaw,
2853    ) -> Result<()> {
2854        if self
2855            .concurrent_state_mut(store)
2856            .get(guest_task)?
2857            .call_post_return_automatically()
2858        {
2859            let (calls, host_table, _, instance) = store
2860                .store_opaque_mut()
2861                .component_resource_state_with_instance(self);
2862            ResourceTables {
2863                calls,
2864                host_table: Some(host_table),
2865                guest: Some(instance.guest_tables()),
2866            }
2867            .exit_call()?;
2868        } else {
2869            // As of this writing, the only scenario where `call_post_return_automatically`
2870            // would be false for a `GuestTask` is for host-to-guest calls using
2871            // `[Typed]Func::call_async`, in which case the `function_index`
2872            // should be a non-`None` value.
2873            let function_index = self
2874                .concurrent_state_mut(store)
2875                .get(guest_task)?
2876                .function_index
2877                .unwrap();
2878
2879            self.id()
2880                .get_mut(store)
2881                .post_return_arg_set(function_index, post_return_arg);
2882        }
2883
2884        let state = self.concurrent_state_mut(store);
2885        let task = state.get_mut(guest_task)?;
2886
2887        if let Caller::Host { tx, .. } = &mut task.caller {
2888            if let Some(tx) = tx.take() {
2889                _ = tx.send(result);
2890            }
2891        } else {
2892            task.result = Some(result);
2893            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2894        }
2895
2896        Ok(())
2897    }
2898
2899    /// Implements the `waitable-set.wait` intrinsic.
2900    pub(crate) fn waitable_set_wait(
2901        self,
2902        store: &mut dyn VMStore,
2903        options: OptionsIndex,
2904        set: u32,
2905        payload: u32,
2906    ) -> Result<u32> {
2907        let state = self.concurrent_state_mut(store);
2908        let opts = state.options(options);
2909        let async_ = opts.async_;
2910        let caller_instance = opts.instance;
2911        let (rep, WaitableState::Set) =
2912            state.waitable_tables[caller_instance].get_mut_by_index(set)?
2913        else {
2914            bail!("invalid waitable-set handle");
2915        };
2916
2917        self.waitable_check(
2918            store,
2919            async_,
2920            WaitableCheck::Wait(WaitableCheckParams {
2921                set: TableId::new(rep),
2922                caller_instance,
2923                options,
2924                payload,
2925            }),
2926        )
2927    }
2928
2929    /// Implements the `waitable-set.poll` intrinsic.
2930    pub(crate) fn waitable_set_poll(
2931        self,
2932        store: &mut dyn VMStore,
2933        options: OptionsIndex,
2934        set: u32,
2935        payload: u32,
2936    ) -> Result<u32> {
2937        let state = self.concurrent_state_mut(store);
2938        let opts = state.options(options);
2939        let async_ = opts.async_;
2940        let caller_instance = opts.instance;
2941        let (rep, WaitableState::Set) =
2942            state.waitable_tables[caller_instance].get_mut_by_index(set)?
2943        else {
2944            bail!("invalid waitable-set handle");
2945        };
2946
2947        self.waitable_check(
2948            store,
2949            async_,
2950            WaitableCheck::Poll(WaitableCheckParams {
2951                set: TableId::new(rep),
2952                caller_instance,
2953                options,
2954                payload,
2955            }),
2956        )
2957    }
2958
2959    /// Implements the `yield` intrinsic.
2960    pub(crate) fn yield_(self, store: &mut dyn VMStore, async_: bool) -> Result<bool> {
2961        self.waitable_check(store, async_, WaitableCheck::Yield)
2962            .map(|_code| {
2963                // TODO: plumb cancellation to here:
2964                // https://github.com/bytecodealliance/wasmtime/issues/11191
2965                false
2966            })
2967    }
2968
2969    /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and
2970    /// `yield` intrinsics.
2971    fn waitable_check(
2972        self,
2973        store: &mut dyn VMStore,
2974        async_: bool,
2975        check: WaitableCheck,
2976    ) -> Result<u32> {
2977        if async_ {
2978            bail!(
2979                "todo: async `waitable-set.wait`, `waitable-set.poll`, and `yield` not yet implemented"
2980            );
2981        }
2982
2983        let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
2984
2985        let (wait, set) = match &check {
2986            WaitableCheck::Wait(params) => (true, Some(params.set)),
2987            WaitableCheck::Poll(params) => (false, Some(params.set)),
2988            WaitableCheck::Yield => (false, None),
2989        };
2990
2991        // First, suspend this fiber, allowing any other tasks to run.
2992        self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
2993
2994        log::trace!("waitable check for {guest_task:?}; set {set:?}");
2995
2996        let state = self.concurrent_state_mut(store);
2997        let task = state.get(guest_task)?;
2998
2999        if wait && task.callback.is_some() {
3000            bail!("cannot call `task.wait` from async-lifted export with callback");
3001        }
3002
3003        // If we're waiting, and there are no events immediately available,
3004        // suspend the fiber until that changes.
3005        if wait {
3006            let set = set.unwrap();
3007
3008            if task.event.is_none() && state.get(set)?.ready.is_empty() {
3009                let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
3010                assert!(old.is_none());
3011
3012                self.suspend(
3013                    store,
3014                    SuspendReason::Waiting {
3015                        set,
3016                        task: guest_task,
3017                    },
3018                )?;
3019            }
3020        }
3021
3022        log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
3023
3024        let result = match check {
3025            // Deliver any pending events to the guest and return.
3026            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3027                let event = self.concurrent_state_mut(store).get_event(
3028                    guest_task,
3029                    params.caller_instance,
3030                    Some(params.set),
3031                )?;
3032
3033                let (ordinal, handle, result) = if wait {
3034                    let (event, waitable) = event.unwrap();
3035                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3036                    let (ordinal, result) = event.parts();
3037                    (ordinal, handle, result)
3038                } else {
3039                    if let Some((event, waitable)) = event {
3040                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3041                        let (ordinal, result) = event.parts();
3042                        (ordinal, handle, result)
3043                    } else {
3044                        log::trace!(
3045                            "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
3046                            params.set
3047                        );
3048                        let (ordinal, result) = Event::None.parts();
3049                        (ordinal, 0, result)
3050                    }
3051                };
3052                let store = store.store_opaque_mut();
3053                let options = Options::new_index(store, self, params.options);
3054                let ptr = func::validate_inbounds::<(u32, u32)>(
3055                    options.memory_mut(store),
3056                    &ValRaw::u32(params.payload),
3057                )?;
3058                options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3059                options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3060                Ok(ordinal)
3061            }
3062            // TODO: Check `GuestTask::event` in case it contains
3063            // `Event::Cancelled`, in which case we'll need to return that to
3064            // the guest:
3065            // https://github.com/bytecodealliance/wasmtime/issues/11191
3066            WaitableCheck::Yield => Ok(0),
3067        };
3068
3069        result
3070    }
3071
3072    /// Implements the `subtask.cancel` intrinsic.
3073    pub(crate) fn subtask_cancel(
3074        self,
3075        store: &mut dyn VMStore,
3076        caller_instance: RuntimeComponentInstanceIndex,
3077        async_: bool,
3078        task_id: u32,
3079    ) -> Result<u32> {
3080        let concurrent_state = self.concurrent_state_mut(store);
3081        let (rep, state) =
3082            concurrent_state.waitable_tables[caller_instance].get_mut_by_index(task_id)?;
3083        let (waitable, expected_caller_instance) = match state {
3084            WaitableState::HostTask => {
3085                let id = TableId::<HostTask>::new(rep);
3086                (
3087                    Waitable::Host(id),
3088                    concurrent_state.get(id)?.caller_instance,
3089                )
3090            }
3091            WaitableState::GuestTask => {
3092                let id = TableId::<GuestTask>::new(rep);
3093                if let Caller::Guest { instance, .. } = &concurrent_state.get(id)?.caller {
3094                    (Waitable::Guest(id), *instance)
3095                } else {
3096                    unreachable!()
3097                }
3098            }
3099            _ => bail!("invalid task handle: {task_id}"),
3100        };
3101        // Since waitables can neither be passed between instances nor forged,
3102        // this should never fail unless there's a bug in Wasmtime, but we check
3103        // here to be sure:
3104        assert_eq!(expected_caller_instance, caller_instance);
3105
3106        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3107
3108        if let Waitable::Host(host_task) = waitable {
3109            if let Some(handle) = concurrent_state.get_mut(host_task)?.abort_handle.take() {
3110                handle.abort();
3111                return Ok(Status::ReturnCancelled as u32);
3112            }
3113        } else {
3114            let caller = concurrent_state.guest_task.unwrap();
3115            let guest_task = TableId::<GuestTask>::new(rep);
3116            let task = concurrent_state.get_mut(guest_task)?;
3117            if task.lower_params.is_some() {
3118                task.lower_params = None;
3119                task.lift_result = None;
3120
3121                // Not yet started; cancel and remove from pending
3122                let callee_instance = task.instance;
3123
3124                let kind = concurrent_state
3125                    .instance_state(callee_instance)
3126                    .pending
3127                    .remove(&guest_task);
3128
3129                if kind.is_none() {
3130                    bail!("`subtask.cancel` called after terminal status delivered");
3131                }
3132
3133                return Ok(Status::StartCancelled as u32);
3134            } else if task.lift_result.is_some() {
3135                // Started, but not yet returned or cancelled; send the
3136                // `CANCELLED` event
3137                task.cancel_sent = true;
3138                // Note that this might overwrite an event that was set earlier
3139                // (e.g. `Event::None` if the task is yielding, or
3140                // `Event::Cancelled` if it was already cancelled), but that's
3141                // okay -- this should supersede the previous state.
3142                task.event = Some(Event::Cancelled);
3143                if let Some(set) = task.wake_on_cancel.take() {
3144                    let item = match concurrent_state
3145                        .get_mut(set)?
3146                        .waiting
3147                        .remove(&guest_task)
3148                        .unwrap()
3149                    {
3150                        WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3151                        WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3152                            task: guest_task,
3153                            kind: GuestCallKind::DeliverEvent {
3154                                instance,
3155                                set: None,
3156                            },
3157                        }),
3158                    };
3159                    concurrent_state.push_high_priority(item);
3160
3161                    self.suspend(store, SuspendReason::Yielding { task: caller })?;
3162                }
3163
3164                let concurrent_state = self.concurrent_state_mut(store);
3165                let task = concurrent_state.get_mut(guest_task)?;
3166                if task.lift_result.is_some() {
3167                    // Still not yet returned or cancelled; if `async_`, return
3168                    // `BLOCKED`; otherwise wait
3169                    if async_ {
3170                        return Ok(BLOCKED);
3171                    } else {
3172                        let waitable = Waitable::Guest(guest_task);
3173                        let old_set = waitable.common(concurrent_state)?.set;
3174                        let set = concurrent_state.get_mut(caller)?.sync_call_set;
3175                        waitable.join(concurrent_state, Some(set))?;
3176
3177                        self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
3178
3179                        waitable.join(self.concurrent_state_mut(store), old_set)?;
3180                    }
3181                }
3182            }
3183        }
3184
3185        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3186        if let Some(Event::Subtask {
3187            status: status @ (Status::Returned | Status::ReturnCancelled),
3188        }) = event
3189        {
3190            Ok(status as u32)
3191        } else {
3192            bail!("`subtask.cancel` called after terminal status delivered");
3193        }
3194    }
3195
3196    /// Configures TLS state so `store` will be available via `tls::get` within
3197    /// the closure `f` provided.
3198    ///
3199    /// This is used to ensure that `Future::poll`, which doesn't take a `store`
3200    /// parameter, is able to get access to the `store` during future poll
3201    /// methods.
3202    fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
3203        struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
3204
3205        impl Drop for Reset<'_> {
3206            fn drop(&mut self) {
3207                self.0.concurrent_async_state_mut().current_instance = self.1;
3208            }
3209        }
3210        let prev = mem::replace(
3211            &mut store.concurrent_async_state_mut().current_instance,
3212            Some(self.id().instance()),
3213        );
3214        let reset = Reset(store, prev);
3215
3216        tls::set(reset.0, f)
3217    }
3218
3219    /// Convenience function to reduce boilerplate.
3220    pub(crate) fn concurrent_state_mut<'a>(
3221        &self,
3222        store: &'a mut StoreOpaque,
3223    ) -> &'a mut ConcurrentState {
3224        self.id().get_mut(store).concurrent_state_mut()
3225    }
3226}
3227
3228/// Trait representing component model ABI async intrinsics and fused adapter
3229/// helper functions.
3230///
3231/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3232/// which must be valid for at least the duration of the call (and possibly for
3233/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3234/// pointers used for async calls).
3235pub trait VMComponentAsyncStore {
3236    /// A helper function for fused adapter modules involving calls where the
3237    /// one of the caller or callee is async.
3238    ///
3239    /// This helper is not used when the caller and callee both use the sync
3240    /// ABI, only when at least one is async is this used.
3241    unsafe fn prepare_call(
3242        &mut self,
3243        instance: Instance,
3244        memory: *mut VMMemoryDefinition,
3245        start: *mut VMFuncRef,
3246        return_: *mut VMFuncRef,
3247        caller_instance: RuntimeComponentInstanceIndex,
3248        callee_instance: RuntimeComponentInstanceIndex,
3249        task_return_type: TypeTupleIndex,
3250        string_encoding: u8,
3251        result_count: u32,
3252        storage: *mut ValRaw,
3253        storage_len: usize,
3254    ) -> Result<()>;
3255
3256    /// A helper function for fused adapter modules involving calls where the
3257    /// caller is sync-lowered but the callee is async-lifted.
3258    unsafe fn sync_start(
3259        &mut self,
3260        instance: Instance,
3261        callback: *mut VMFuncRef,
3262        callee: *mut VMFuncRef,
3263        param_count: u32,
3264        storage: *mut MaybeUninit<ValRaw>,
3265        storage_len: usize,
3266    ) -> Result<()>;
3267
3268    /// A helper function for fused adapter modules involving calls where the
3269    /// caller is async-lowered.
3270    unsafe fn async_start(
3271        &mut self,
3272        instance: Instance,
3273        callback: *mut VMFuncRef,
3274        post_return: *mut VMFuncRef,
3275        callee: *mut VMFuncRef,
3276        param_count: u32,
3277        result_count: u32,
3278        flags: u32,
3279    ) -> Result<u32>;
3280
3281    /// The `future.write` intrinsic.
3282    fn future_write(
3283        &mut self,
3284        instance: Instance,
3285        ty: TypeFutureTableIndex,
3286        options: OptionsIndex,
3287        future: u32,
3288        address: u32,
3289    ) -> Result<u32>;
3290
3291    /// The `future.read` intrinsic.
3292    fn future_read(
3293        &mut self,
3294        instance: Instance,
3295        ty: TypeFutureTableIndex,
3296        options: OptionsIndex,
3297        future: u32,
3298        address: u32,
3299    ) -> Result<u32>;
3300
3301    /// The `future.drop-writable` intrinsic.
3302    fn future_drop_writable(
3303        &mut self,
3304        instance: Instance,
3305        ty: TypeFutureTableIndex,
3306        writer: u32,
3307    ) -> Result<()>;
3308
3309    /// The `stream.write` intrinsic.
3310    fn stream_write(
3311        &mut self,
3312        instance: Instance,
3313        ty: TypeStreamTableIndex,
3314        options: OptionsIndex,
3315        stream: u32,
3316        address: u32,
3317        count: u32,
3318    ) -> Result<u32>;
3319
3320    /// The `stream.read` intrinsic.
3321    fn stream_read(
3322        &mut self,
3323        instance: Instance,
3324        ty: TypeStreamTableIndex,
3325        options: OptionsIndex,
3326        stream: u32,
3327        address: u32,
3328        count: u32,
3329    ) -> Result<u32>;
3330
3331    /// The "fast-path" implementation of the `stream.write` intrinsic for
3332    /// "flat" (i.e. memcpy-able) payloads.
3333    fn flat_stream_write(
3334        &mut self,
3335        instance: Instance,
3336        ty: TypeStreamTableIndex,
3337        options: OptionsIndex,
3338        payload_size: u32,
3339        payload_align: u32,
3340        stream: u32,
3341        address: u32,
3342        count: u32,
3343    ) -> Result<u32>;
3344
3345    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3346    /// (i.e. memcpy-able) payloads.
3347    fn flat_stream_read(
3348        &mut self,
3349        instance: Instance,
3350        ty: TypeStreamTableIndex,
3351        options: OptionsIndex,
3352        payload_size: u32,
3353        payload_align: u32,
3354        stream: u32,
3355        address: u32,
3356        count: u32,
3357    ) -> Result<u32>;
3358
3359    /// The `stream.drop-writable` intrinsic.
3360    fn stream_drop_writable(
3361        &mut self,
3362        instance: Instance,
3363        ty: TypeStreamTableIndex,
3364        writer: u32,
3365    ) -> Result<()>;
3366
3367    /// The `error-context.debug-message` intrinsic.
3368    fn error_context_debug_message(
3369        &mut self,
3370        instance: Instance,
3371        ty: TypeComponentLocalErrorContextTableIndex,
3372        options: OptionsIndex,
3373        err_ctx_handle: u32,
3374        debug_msg_address: u32,
3375    ) -> Result<()>;
3376}
3377
3378/// SAFETY: See trait docs.
3379impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3380    unsafe fn prepare_call(
3381        &mut self,
3382        instance: Instance,
3383        memory: *mut VMMemoryDefinition,
3384        start: *mut VMFuncRef,
3385        return_: *mut VMFuncRef,
3386        caller_instance: RuntimeComponentInstanceIndex,
3387        callee_instance: RuntimeComponentInstanceIndex,
3388        task_return_type: TypeTupleIndex,
3389        string_encoding: u8,
3390        result_count_or_max_if_async: u32,
3391        storage: *mut ValRaw,
3392        storage_len: usize,
3393    ) -> Result<()> {
3394        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3395        // this method will have ensured that `storage` is a valid
3396        // pointer containing at least `storage_len` items.
3397        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3398
3399        unsafe {
3400            instance.prepare_call(
3401                StoreContextMut(self),
3402                start,
3403                return_,
3404                caller_instance,
3405                callee_instance,
3406                task_return_type,
3407                memory,
3408                string_encoding,
3409                match result_count_or_max_if_async {
3410                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3411                        params,
3412                        has_result: false,
3413                    },
3414                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3415                        params,
3416                        has_result: true,
3417                    },
3418                    result_count => CallerInfo::Sync {
3419                        params,
3420                        result_count,
3421                    },
3422                },
3423            )
3424        }
3425    }
3426
3427    unsafe fn sync_start(
3428        &mut self,
3429        instance: Instance,
3430        callback: *mut VMFuncRef,
3431        callee: *mut VMFuncRef,
3432        param_count: u32,
3433        storage: *mut MaybeUninit<ValRaw>,
3434        storage_len: usize,
3435    ) -> Result<()> {
3436        unsafe {
3437            instance
3438                .start_call(
3439                    StoreContextMut(self),
3440                    callback,
3441                    ptr::null_mut(),
3442                    callee,
3443                    param_count,
3444                    1,
3445                    START_FLAG_ASYNC_CALLEE,
3446                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3447                    // this method will have ensured that `storage` is a valid
3448                    // pointer containing at least `storage_len` items.
3449                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3450                )
3451                .map(drop)
3452        }
3453    }
3454
3455    unsafe fn async_start(
3456        &mut self,
3457        instance: Instance,
3458        callback: *mut VMFuncRef,
3459        post_return: *mut VMFuncRef,
3460        callee: *mut VMFuncRef,
3461        param_count: u32,
3462        result_count: u32,
3463        flags: u32,
3464    ) -> Result<u32> {
3465        unsafe {
3466            instance.start_call(
3467                StoreContextMut(self),
3468                callback,
3469                post_return,
3470                callee,
3471                param_count,
3472                result_count,
3473                flags,
3474                None,
3475            )
3476        }
3477    }
3478
3479    fn future_write(
3480        &mut self,
3481        instance: Instance,
3482        ty: TypeFutureTableIndex,
3483        options: OptionsIndex,
3484        future: u32,
3485        address: u32,
3486    ) -> Result<u32> {
3487        instance
3488            .guest_write(
3489                StoreContextMut(self),
3490                TableIndex::Future(ty),
3491                options,
3492                None,
3493                future,
3494                address,
3495                1,
3496            )
3497            .map(|result| result.encode())
3498    }
3499
3500    fn future_read(
3501        &mut self,
3502        instance: Instance,
3503        ty: TypeFutureTableIndex,
3504        options: OptionsIndex,
3505        future: u32,
3506        address: u32,
3507    ) -> Result<u32> {
3508        instance
3509            .guest_read(
3510                StoreContextMut(self),
3511                TableIndex::Future(ty),
3512                options,
3513                None,
3514                future,
3515                address,
3516                1,
3517            )
3518            .map(|result| result.encode())
3519    }
3520
3521    fn stream_write(
3522        &mut self,
3523        instance: Instance,
3524        ty: TypeStreamTableIndex,
3525        options: OptionsIndex,
3526        stream: u32,
3527        address: u32,
3528        count: u32,
3529    ) -> Result<u32> {
3530        instance
3531            .guest_write(
3532                StoreContextMut(self),
3533                TableIndex::Stream(ty),
3534                options,
3535                None,
3536                stream,
3537                address,
3538                count,
3539            )
3540            .map(|result| result.encode())
3541    }
3542
3543    fn stream_read(
3544        &mut self,
3545        instance: Instance,
3546        ty: TypeStreamTableIndex,
3547        options: OptionsIndex,
3548        stream: u32,
3549        address: u32,
3550        count: u32,
3551    ) -> Result<u32> {
3552        instance
3553            .guest_read(
3554                StoreContextMut(self),
3555                TableIndex::Stream(ty),
3556                options,
3557                None,
3558                stream,
3559                address,
3560                count,
3561            )
3562            .map(|result| result.encode())
3563    }
3564
3565    fn future_drop_writable(
3566        &mut self,
3567        instance: Instance,
3568        ty: TypeFutureTableIndex,
3569        writer: u32,
3570    ) -> Result<()> {
3571        instance.guest_drop_writable(StoreContextMut(self), TableIndex::Future(ty), writer)
3572    }
3573
3574    fn flat_stream_write(
3575        &mut self,
3576        instance: Instance,
3577        ty: TypeStreamTableIndex,
3578        options: OptionsIndex,
3579        payload_size: u32,
3580        payload_align: u32,
3581        stream: u32,
3582        address: u32,
3583        count: u32,
3584    ) -> Result<u32> {
3585        instance
3586            .guest_write(
3587                StoreContextMut(self),
3588                TableIndex::Stream(ty),
3589                options,
3590                Some(FlatAbi {
3591                    size: payload_size,
3592                    align: payload_align,
3593                }),
3594                stream,
3595                address,
3596                count,
3597            )
3598            .map(|result| result.encode())
3599    }
3600
3601    fn flat_stream_read(
3602        &mut self,
3603        instance: Instance,
3604        ty: TypeStreamTableIndex,
3605        options: OptionsIndex,
3606        payload_size: u32,
3607        payload_align: u32,
3608        stream: u32,
3609        address: u32,
3610        count: u32,
3611    ) -> Result<u32> {
3612        instance
3613            .guest_read(
3614                StoreContextMut(self),
3615                TableIndex::Stream(ty),
3616                options,
3617                Some(FlatAbi {
3618                    size: payload_size,
3619                    align: payload_align,
3620                }),
3621                stream,
3622                address,
3623                count,
3624            )
3625            .map(|result| result.encode())
3626    }
3627
3628    fn stream_drop_writable(
3629        &mut self,
3630        instance: Instance,
3631        ty: TypeStreamTableIndex,
3632        writer: u32,
3633    ) -> Result<()> {
3634        instance.guest_drop_writable(StoreContextMut(self), TableIndex::Stream(ty), writer)
3635    }
3636
3637    fn error_context_debug_message(
3638        &mut self,
3639        instance: Instance,
3640        ty: TypeComponentLocalErrorContextTableIndex,
3641        options: OptionsIndex,
3642        err_ctx_handle: u32,
3643        debug_msg_address: u32,
3644    ) -> Result<()> {
3645        instance.error_context_debug_message(
3646            StoreContextMut(self),
3647            ty,
3648            options,
3649            err_ctx_handle,
3650            debug_msg_address,
3651        )
3652    }
3653}
3654
3655/// Represents the output of a host task or background task.
3656pub(crate) enum HostTaskOutput {
3657    /// A plain result
3658    Result(Result<()>),
3659    /// A function to be run after the future completes (e.g. post-processing
3660    /// which requires access to the store and instance).
3661    Function(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>),
3662}
3663
3664impl HostTaskOutput {
3665    /// Retrieve the result of the host or background task, running the
3666    /// post-processing function if present.
3667    fn consume(self, store: &mut dyn VMStore, instance: Instance) -> Result<()> {
3668        match self {
3669            Self::Function(fun) => fun(store, instance),
3670            Self::Result(result) => result,
3671        }
3672    }
3673}
3674
3675type HostTaskFuture = Pin<Box<dyn Future<Output = HostTaskOutput> + Send + 'static>>;
3676
3677/// Represents the state of a pending host task.
3678struct HostTask {
3679    common: WaitableCommon,
3680    caller_instance: RuntimeComponentInstanceIndex,
3681    abort_handle: Option<AbortHandle>,
3682}
3683
3684impl HostTask {
3685    fn new(
3686        caller_instance: RuntimeComponentInstanceIndex,
3687        abort_handle: Option<AbortHandle>,
3688    ) -> Self {
3689        Self {
3690            common: WaitableCommon::default(),
3691            caller_instance,
3692            abort_handle,
3693        }
3694    }
3695}
3696
3697impl TableDebug for HostTask {
3698    fn type_name() -> &'static str {
3699        "HostTask"
3700    }
3701}
3702
3703type CallbackFn = Box<
3704    dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3705        + Send
3706        + Sync
3707        + 'static,
3708>;
3709
3710/// Represents the caller of a given guest task.
3711enum Caller {
3712    /// The host called the guest task.
3713    Host {
3714        /// If present, may be used to deliver the result.
3715        tx: Option<oneshot::Sender<LiftedResult>>,
3716        /// If true, remove the task from the concurrent state that owns it
3717        /// automatically after it completes.
3718        remove_task_automatically: bool,
3719        /// If true, call `post-return` function (if any) automatically.
3720        call_post_return_automatically: bool,
3721    },
3722    /// Another guest task called the guest task
3723    Guest {
3724        /// The id of the caller
3725        task: TableId<GuestTask>,
3726        /// The instance to use to enforce reentrance rules.
3727        ///
3728        /// Note that this might not be the same as the instance the caller task
3729        /// started executing in given that one or more synchronous guest->guest
3730        /// calls may have occurred involving multiple instances.
3731        instance: RuntimeComponentInstanceIndex,
3732    },
3733}
3734
3735/// Represents a closure and related canonical ABI parameters required to
3736/// validate a `task.return` call at runtime and lift the result.
3737struct LiftResult {
3738    lift: RawLift,
3739    ty: TypeTupleIndex,
3740    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3741    string_encoding: StringEncoding,
3742}
3743
3744/// Represents a pending guest task.
3745struct GuestTask {
3746    /// See `WaitableCommon`
3747    common: WaitableCommon,
3748    /// Closure to lower the parameters passed to this task.
3749    lower_params: Option<RawLower>,
3750    /// See `LiftResult`
3751    lift_result: Option<LiftResult>,
3752    /// A place to stash the type-erased lifted result if it can't be delivered
3753    /// immediately.
3754    result: Option<LiftedResult>,
3755    /// Closure to call the callback function for an async-lifted export, if
3756    /// provided.
3757    callback: Option<CallbackFn>,
3758    /// See `Caller`
3759    caller: Caller,
3760    /// A place to stash the call context for managing resource borrows while
3761    /// switching between guest tasks.
3762    call_context: Option<CallContext>,
3763    /// A place to stash the lowered result for a sync-to-async call until it
3764    /// can be returned to the caller.
3765    sync_result: Option<Option<ValRaw>>,
3766    /// Whether or not the task has been cancelled (i.e. whether the task is
3767    /// permitted to call `task.cancel`).
3768    cancel_sent: bool,
3769    /// Whether or not we've sent a `Status::Starting` event to any current or
3770    /// future waiters for this waitable.
3771    starting_sent: bool,
3772    /// Context-local state used to implement the `context.{get,set}`
3773    /// intrinsics.
3774    context: [u32; 2],
3775    /// Pending guest subtasks created by this task (directly or indirectly).
3776    ///
3777    /// This is used to re-parent subtasks which are still running when their
3778    /// parent task is disposed.
3779    subtasks: HashSet<TableId<GuestTask>>,
3780    /// Scratch waitable set used to watch subtasks during synchronous calls.
3781    sync_call_set: TableId<WaitableSet>,
3782    /// The instance to which the exported function for this guest task belongs.
3783    ///
3784    /// Note that the task may do a sync->sync call via a fused adapter which
3785    /// results in that task executing code in a different instance, and it may
3786    /// call host functions and intrinsics from that other instance.
3787    instance: RuntimeComponentInstanceIndex,
3788    /// If present, a pending `Event::None` or `Event::Cancelled` to be
3789    /// delivered to this task.
3790    event: Option<Event>,
3791    /// If present, indicates that the task is currently waiting on the
3792    /// specified set but may be cancelled and woken immediately.
3793    wake_on_cancel: Option<TableId<WaitableSet>>,
3794    /// The `ExportIndex` of the guest function being called, if known.
3795    function_index: Option<ExportIndex>,
3796    /// Whether or not the task has exited.
3797    exited: bool,
3798}
3799
3800impl GuestTask {
3801    fn new(
3802        state: &mut ConcurrentState,
3803        lower_params: RawLower,
3804        lift_result: LiftResult,
3805        caller: Caller,
3806        callback: Option<CallbackFn>,
3807        component_instance: RuntimeComponentInstanceIndex,
3808    ) -> Result<Self> {
3809        let sync_call_set = state.push(WaitableSet::default())?;
3810
3811        Ok(Self {
3812            common: WaitableCommon::default(),
3813            lower_params: Some(lower_params),
3814            lift_result: Some(lift_result),
3815            result: None,
3816            callback,
3817            caller,
3818            call_context: Some(CallContext::default()),
3819            sync_result: None,
3820            cancel_sent: false,
3821            starting_sent: false,
3822            context: [0u32; 2],
3823            subtasks: HashSet::new(),
3824            sync_call_set,
3825            instance: component_instance,
3826            event: None,
3827            wake_on_cancel: None,
3828            function_index: None,
3829            exited: false,
3830        })
3831    }
3832
3833    /// Dispose of this guest task, reparenting any pending subtasks to the
3834    /// caller.
3835    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3836        // If there are not-yet-delivered completion events for subtasks in
3837        // `self.sync_call_set`, recursively dispose of those subtasks as well.
3838        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3839            if let Some(Event::Subtask {
3840                status: Status::Returned | Status::ReturnCancelled,
3841            }) = waitable.common(state)?.event
3842            {
3843                waitable.delete_from(state)?;
3844            }
3845        }
3846
3847        state.delete(self.sync_call_set)?;
3848
3849        // Reparent any pending subtasks to the caller.
3850        if let Caller::Guest {
3851            task,
3852            instance: runtime_instance,
3853        } = &self.caller
3854        {
3855            let task_mut = state.get_mut(*task)?;
3856            let present = task_mut.subtasks.remove(&me);
3857            assert!(present);
3858
3859            for subtask in &self.subtasks {
3860                task_mut.subtasks.insert(*subtask);
3861            }
3862
3863            for subtask in &self.subtasks {
3864                state.get_mut(*subtask)?.caller = Caller::Guest {
3865                    task: *task,
3866                    instance: *runtime_instance,
3867                };
3868            }
3869        } else {
3870            for subtask in &self.subtasks {
3871                state.get_mut(*subtask)?.caller = Caller::Host {
3872                    tx: None,
3873                    remove_task_automatically: true,
3874                    call_post_return_automatically: true,
3875                };
3876            }
3877        }
3878
3879        Ok(())
3880    }
3881
3882    fn call_post_return_automatically(&self) -> bool {
3883        matches!(
3884            self.caller,
3885            Caller::Guest { .. }
3886                | Caller::Host {
3887                    call_post_return_automatically: true,
3888                    ..
3889                }
3890        )
3891    }
3892}
3893
3894impl TableDebug for GuestTask {
3895    fn type_name() -> &'static str {
3896        "GuestTask"
3897    }
3898}
3899
3900/// Represents state common to all kinds of waitables.
3901#[derive(Default)]
3902struct WaitableCommon {
3903    /// The currently pending event for this waitable, if any.
3904    event: Option<Event>,
3905    /// The set to which this waitable belongs, if any.
3906    set: Option<TableId<WaitableSet>>,
3907}
3908
3909/// Represents a Component Model Async `waitable`.
3910#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3911enum Waitable {
3912    /// A host task
3913    Host(TableId<HostTask>),
3914    /// A guest task
3915    Guest(TableId<GuestTask>),
3916    /// The read or write end of a stream or future
3917    Transmit(TableId<TransmitHandle>),
3918}
3919
3920impl Waitable {
3921    /// Retrieve the `Waitable` corresponding to the specified guest-visible
3922    /// handle.
3923    fn from_instance(
3924        state: &mut ConcurrentState,
3925        caller_instance: RuntimeComponentInstanceIndex,
3926        waitable: u32,
3927    ) -> Result<Self> {
3928        let (waitable, state) =
3929            state.waitable_tables[caller_instance].get_mut_by_index(waitable)?;
3930
3931        Ok(match state {
3932            WaitableState::HostTask => Waitable::Host(TableId::new(waitable)),
3933            WaitableState::GuestTask => Waitable::Guest(TableId::new(waitable)),
3934            WaitableState::Stream(..) | WaitableState::Future(..) => {
3935                Waitable::Transmit(TableId::new(waitable))
3936            }
3937            _ => bail!("invalid waitable handle"),
3938        })
3939    }
3940
3941    /// Retrieve the host-visible identifier for this `Waitable`.
3942    fn rep(&self) -> u32 {
3943        match self {
3944            Self::Host(id) => id.rep(),
3945            Self::Guest(id) => id.rep(),
3946            Self::Transmit(id) => id.rep(),
3947        }
3948    }
3949
3950    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
3951    /// remove it from any set it may currently belong to (when `set` is
3952    /// `None`).
3953    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3954        log::trace!("waitable {self:?} join set {set:?}",);
3955
3956        let old = mem::replace(&mut self.common(state)?.set, set);
3957
3958        if let Some(old) = old {
3959            match *self {
3960                Waitable::Host(id) => state.remove_child(id, old),
3961                Waitable::Guest(id) => state.remove_child(id, old),
3962                Waitable::Transmit(id) => state.remove_child(id, old),
3963            }?;
3964
3965            state.get_mut(old)?.ready.remove(self);
3966        }
3967
3968        if let Some(set) = set {
3969            match *self {
3970                Waitable::Host(id) => state.add_child(id, set),
3971                Waitable::Guest(id) => state.add_child(id, set),
3972                Waitable::Transmit(id) => state.add_child(id, set),
3973            }?;
3974
3975            if self.common(state)?.event.is_some() {
3976                self.mark_ready(state)?;
3977            }
3978        }
3979
3980        Ok(())
3981    }
3982
3983    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
3984    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3985        Ok(match self {
3986            Self::Host(id) => &mut state.get_mut(*id)?.common,
3987            Self::Guest(id) => &mut state.get_mut(*id)?.common,
3988            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3989        })
3990    }
3991
3992    /// Set or clear the pending event for this waitable and either deliver it
3993    /// to the first waiter, if any, or mark it as ready to be delivered to the
3994    /// next waiter that arrives.
3995    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3996        log::trace!("set event for {self:?}: {event:?}");
3997        self.common(state)?.event = event;
3998        self.mark_ready(state)
3999    }
4000
4001    /// Take the pending event from this waitable, leaving `None` in its place.
4002    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4003        let common = self.common(state)?;
4004        let event = common.event.take();
4005        if let Some(set) = self.common(state)?.set {
4006            state.get_mut(set)?.ready.remove(self);
4007        }
4008        Ok(event)
4009    }
4010
4011    /// Deliver the current event for this waitable to the first waiter, if any,
4012    /// or else mark it as ready to be delivered to the next waiter that
4013    /// arrives.
4014    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4015        if let Some(set) = self.common(state)?.set {
4016            state.get_mut(set)?.ready.insert(*self);
4017            if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
4018                let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
4019                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4020
4021                let item = match mode {
4022                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4023                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4024                        task,
4025                        kind: GuestCallKind::DeliverEvent {
4026                            instance,
4027                            set: Some(set),
4028                        },
4029                    }),
4030                };
4031                state.push_high_priority(item);
4032            }
4033        }
4034        Ok(())
4035    }
4036
4037    /// Handle the imminent delivery of the specified event, e.g. by updating
4038    /// the state of the stream or future.
4039    fn on_delivery(&self, state: &mut ConcurrentState, event: Event) {
4040        match event {
4041            Event::FutureRead {
4042                pending: Some((ty, handle)),
4043                ..
4044            }
4045            | Event::FutureWrite {
4046                pending: Some((ty, handle)),
4047                ..
4048            } => {
4049                let runtime_instance = state.component.types()[ty].instance;
4050                let (rep, WaitableState::Future(actual_ty, state)) = state.waitable_tables
4051                    [runtime_instance]
4052                    .get_mut_by_index(handle)
4053                    .unwrap()
4054                else {
4055                    unreachable!()
4056                };
4057                assert_eq!(*actual_ty, ty);
4058                assert_eq!(rep, self.rep());
4059                assert_eq!(*state, StreamFutureState::Busy);
4060                *state = match event {
4061                    Event::FutureRead { .. } => StreamFutureState::Read { done: false },
4062                    Event::FutureWrite { .. } => StreamFutureState::Write { done: false },
4063                    _ => unreachable!(),
4064                };
4065            }
4066            Event::StreamRead {
4067                pending: Some((ty, handle)),
4068                code,
4069            }
4070            | Event::StreamWrite {
4071                pending: Some((ty, handle)),
4072                code,
4073            } => {
4074                let runtime_instance = state.component.types()[ty].instance;
4075                let (rep, WaitableState::Stream(actual_ty, state)) = state.waitable_tables
4076                    [runtime_instance]
4077                    .get_mut_by_index(handle)
4078                    .unwrap()
4079                else {
4080                    unreachable!()
4081                };
4082                assert_eq!(*actual_ty, ty);
4083                assert_eq!(rep, self.rep());
4084                assert_eq!(*state, StreamFutureState::Busy);
4085                let done = matches!(code, ReturnCode::Dropped(_));
4086                *state = match event {
4087                    Event::StreamRead { .. } => StreamFutureState::Read { done },
4088                    Event::StreamWrite { .. } => StreamFutureState::Write { done },
4089                    _ => unreachable!(),
4090                };
4091            }
4092            _ => {}
4093        }
4094    }
4095
4096    /// Remove this waitable from the instance's rep table.
4097    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4098        match self {
4099            Self::Host(task) => {
4100                log::trace!("delete host task {task:?}");
4101                state.delete(*task)?;
4102            }
4103            Self::Guest(task) => {
4104                log::trace!("delete guest task {task:?}");
4105                state.delete(*task)?.dispose(state, *task)?;
4106            }
4107            Self::Transmit(task) => {
4108                state.delete(*task)?;
4109            }
4110        }
4111
4112        Ok(())
4113    }
4114}
4115
4116impl fmt::Debug for Waitable {
4117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4118        match self {
4119            Self::Host(id) => write!(f, "{id:?}"),
4120            Self::Guest(id) => write!(f, "{id:?}"),
4121            Self::Transmit(id) => write!(f, "{id:?}"),
4122        }
4123    }
4124}
4125
4126/// Represents a Component Model Async `waitable-set`.
4127#[derive(Default)]
4128struct WaitableSet {
4129    /// Which waitables in this set have pending events, if any.
4130    ready: BTreeSet<Waitable>,
4131    /// Which guest tasks are currently waiting on this set, if any.
4132    waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
4133}
4134
4135impl TableDebug for WaitableSet {
4136    fn type_name() -> &'static str {
4137        "WaitableSet"
4138    }
4139}
4140
4141/// Type-erased closure to lower the parameters for a guest task.
4142type RawLower = Box<
4143    dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4144>;
4145
4146/// Type-erased closure to lift the result for a guest task.
4147type RawLift = Box<
4148    dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4149        + Send
4150        + Sync,
4151>;
4152
4153/// Type erased result of a guest task which may be downcast to the expected
4154/// type by a host caller (or simply ignored in the case of a guest caller; see
4155/// `DummyResult`).
4156type LiftedResult = Box<dyn Any + Send + Sync>;
4157
4158/// Used to return a result from a `LiftFn` when the actual result has already
4159/// been lowered to a guest task's stack and linear memory.
4160struct DummyResult;
4161
4162/// Represents the state of a currently executing fiber which has been resumed
4163/// via `self::poll_fn`.
4164pub(crate) struct AsyncState {
4165    /// The current instance being polled, if any, which is used to perform
4166    /// checks to ensure that futures are always polled within the correct
4167    /// instance.
4168    current_instance: Option<ComponentInstanceId>,
4169}
4170
4171impl Default for AsyncState {
4172    fn default() -> Self {
4173        Self {
4174            current_instance: None,
4175        }
4176    }
4177}
4178
4179/// Represents the Component Model Async state of a (sub-)component instance.
4180#[derive(Default)]
4181struct InstanceState {
4182    /// Whether backpressure is set for this instance
4183    backpressure: bool,
4184    /// Whether this instance can be entered
4185    do_not_enter: bool,
4186    /// Pending calls for this instance which require `Self::backpressure` to be
4187    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4188    pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
4189}
4190
4191/// Represents the Component Model Async state of a top-level component instance
4192/// (i.e. a `super::ComponentInstance`).
4193pub struct ConcurrentState {
4194    /// The currently running guest task, if any.
4195    guest_task: Option<TableId<GuestTask>>,
4196    /// The set of pending host and background tasks, if any.
4197    ///
4198    /// We must wrap this in a `Mutex` to ensure that `ComponentInstance` and
4199    /// `Store` satisfy a `Sync` bound, but it can't actually be accessed from
4200    /// more than one thread at a time.
4201    ///
4202    /// See `ComponentInstance::poll_until` for where we temporarily take this
4203    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4204    futures: Mutex<Option<FuturesUnordered<HostTaskFuture>>>,
4205    /// The table of waitables, waitable sets, etc.
4206    table: Table,
4207    /// Per (sub-)component instance states.
4208    ///
4209    /// See `InstanceState` for details and note that this map is lazily
4210    /// populated as needed.
4211    // TODO: this can and should be a `PrimaryMap`
4212    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4213    /// Tables for tracking per-(sub-)component waitable handles and their
4214    /// states.
4215    waitable_tables: PrimaryMap<RuntimeComponentInstanceIndex, StateTable<WaitableState>>,
4216    /// The "high priority" work queue for this instance's event loop.
4217    high_priority: Vec<WorkItem>,
4218    /// The "high priority" work queue for this instance's event loop.
4219    low_priority: Vec<WorkItem>,
4220    /// A place to stash the reason a fiber is suspending so that the code which
4221    /// resumed it will know under what conditions the fiber should be resumed
4222    /// again.
4223    suspend_reason: Option<SuspendReason>,
4224    /// A cached fiber which is waiting for work to do.
4225    ///
4226    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4227    worker: Option<StoreFiber<'static>>,
4228    /// A place to stash the work item for which we're resuming a worker fiber.
4229    worker_item: Option<WorkerItem>,
4230
4231    /// (Sub)Component specific error context tracking
4232    ///
4233    /// At the component level, only the number of references (`usize`) to a given error context is tracked,
4234    /// with state related to the error context being held at the component model level, in concurrent
4235    /// state.
4236    ///
4237    /// The state tables in the (sub)component local tracking must contain a pointer into the global
4238    /// error context lookups in order to ensure that in contexts where only the local reference is present
4239    /// the global state can still be maintained/updated.
4240    error_context_tables:
4241        PrimaryMap<TypeComponentLocalErrorContextTableIndex, StateTable<LocalErrorContextRefCount>>,
4242
4243    /// Reference counts for all component error contexts
4244    ///
4245    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4246    /// (sub)component ref counts as tracked by `error_context_tables`, for
4247    /// example when the host holds one or more references to error contexts.
4248    ///
4249    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4250    /// component-wide representation) of the index into concurrent state for a given
4251    /// stored `ErrorContext`.
4252    ///
4253    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4254    /// as a `TableId<ErrorContextState>`.
4255    global_error_context_ref_counts:
4256        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4257
4258    /// Mirror of type information in `ComponentInstance`, placed here for
4259    /// convenience at the cost of an extra `Arc` clone.
4260    component: Component,
4261}
4262
4263impl ConcurrentState {
4264    pub(crate) fn new(component: &Component) -> Self {
4265        let num_waitable_tables = component.env_component().num_runtime_component_instances;
4266        let num_error_context_tables = component.env_component().num_error_context_tables;
4267        let mut waitable_tables =
4268            PrimaryMap::with_capacity(usize::try_from(num_waitable_tables).unwrap());
4269        for _ in 0..num_waitable_tables {
4270            waitable_tables.push(StateTable::default());
4271        }
4272
4273        let mut error_context_tables = PrimaryMap::<
4274            TypeComponentLocalErrorContextTableIndex,
4275            StateTable<LocalErrorContextRefCount>,
4276        >::with_capacity(num_error_context_tables);
4277        for _ in 0..num_error_context_tables {
4278            error_context_tables.push(StateTable::default());
4279        }
4280
4281        Self {
4282            guest_task: None,
4283            table: Table::new(),
4284            futures: Mutex::new(Some(FuturesUnordered::new())),
4285            instance_states: HashMap::new(),
4286            waitable_tables,
4287            high_priority: Vec::new(),
4288            low_priority: Vec::new(),
4289            suspend_reason: None,
4290            worker: None,
4291            worker_item: None,
4292            error_context_tables,
4293            global_error_context_ref_counts: BTreeMap::new(),
4294            component: component.clone(),
4295        }
4296    }
4297
4298    /// Take ownership of any fibers and futures owned by this object.
4299    ///
4300    /// This should be used when disposing of the `Store` containing this object
4301    /// in order to gracefully resolve any and all fibers using
4302    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4303    /// use-after-free bugs due to fibers which may still have access to the
4304    /// `Store`.
4305    ///
4306    /// Additionally, the futures collected with this function should be dropped
4307    /// within a `tls::set` call, which will ensure than any futures closing
4308    /// over an `&Accessor` will have access to the store when dropped, allowing
4309    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4310    /// panicking.
4311    ///
4312    /// Note that this will leave the object in an inconsistent and unusable
4313    /// state, so it should only be used just prior to dropping it.
4314    pub(crate) fn take_fibers_and_futures(
4315        &mut self,
4316        fibers: &mut Vec<StoreFiber<'static>>,
4317        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4318    ) {
4319        for entry in self.table.iter_mut() {
4320            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4321                for mode in mem::take(&mut set.waiting).into_values() {
4322                    if let WaitMode::Fiber(fiber) = mode {
4323                        fibers.push(fiber);
4324                    }
4325                }
4326            }
4327        }
4328
4329        if let Some(fiber) = self.worker.take() {
4330            fibers.push(fiber);
4331        }
4332
4333        let mut take_items = |list| {
4334            for item in mem::take(list) {
4335                match item {
4336                    WorkItem::ResumeFiber(fiber) => {
4337                        fibers.push(fiber);
4338                    }
4339                    WorkItem::PushFuture(future) => {
4340                        self.futures
4341                            .get_mut()
4342                            .unwrap()
4343                            .as_mut()
4344                            .unwrap()
4345                            .push(future.into_inner().unwrap());
4346                    }
4347                    _ => {}
4348                }
4349            }
4350        };
4351
4352        take_items(&mut self.high_priority);
4353        take_items(&mut self.low_priority);
4354
4355        if let Some(them) = self.futures.get_mut().unwrap().take() {
4356            futures.push(them);
4357        }
4358    }
4359}
4360
4361/// Provide a type hint to compiler about the shape of a parameter lower
4362/// closure.
4363fn for_any_lower<
4364    F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4365>(
4366    fun: F,
4367) -> F {
4368    fun
4369}
4370
4371/// Provide a type hint to compiler about the shape of a result lift closure.
4372fn for_any_lift<
4373    F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4374        + Send
4375        + Sync,
4376>(
4377    fun: F,
4378) -> F {
4379    fun
4380}
4381
4382/// Wrap the specified future in a `poll_fn` which asserts that the future is
4383/// only polled from the event loop of the specified `Instance`.
4384///
4385/// See `Instance::run_concurrent` for details.
4386fn checked<F: Future + Send + 'static>(
4387    instance: Instance,
4388    fut: F,
4389) -> impl Future<Output = F::Output> + Send + 'static {
4390    async move {
4391        let mut fut = pin!(fut);
4392        future::poll_fn(move |cx| {
4393            let message = "\
4394                `Future`s which depend on asynchronous component tasks, streams, or \
4395                futures to complete may only be polled from the event loop of the \
4396                instance from which they originated.  Please use \
4397                `Instance::{run_concurrent,spawn}` to poll or await them.\
4398            ";
4399            tls::try_get(|store| {
4400                let matched = match store {
4401                    tls::TryGet::Some(store) => {
4402                        let a = store.concurrent_async_state_mut().current_instance;
4403                        a == Some(instance.id().instance())
4404                    }
4405                    tls::TryGet::Taken | tls::TryGet::None => false,
4406                };
4407
4408                if !matched {
4409                    panic!("{message}")
4410                }
4411            });
4412            fut.as_mut().poll(cx)
4413        })
4414        .await
4415    }
4416}
4417
4418/// Assert that `Instance::run_concurrent` has not been called from within an
4419/// instance's event loop.
4420fn check_recursive_run() {
4421    tls::try_get(|store| {
4422        if !matches!(store, tls::TryGet::None) {
4423            panic!("Recursive `Instance::run_concurrent` calls not supported")
4424        }
4425    });
4426}
4427
4428fn unpack_callback_code(code: u32) -> (u32, u32) {
4429    (code & 0xF, code >> 4)
4430}
4431
4432/// Helper struct for packaging parameters to be passed to
4433/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4434/// `waitable-set.poll`.
4435struct WaitableCheckParams {
4436    set: TableId<WaitableSet>,
4437    caller_instance: RuntimeComponentInstanceIndex,
4438    options: OptionsIndex,
4439    payload: u32,
4440}
4441
4442/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4443enum WaitableCheck {
4444    Wait(WaitableCheckParams),
4445    Poll(WaitableCheckParams),
4446    Yield,
4447}
4448
4449/// Represents a guest task called from the host, prepared using `prepare_call`.
4450pub(crate) struct PreparedCall<R> {
4451    /// The guest export to be called
4452    handle: Func,
4453    /// The guest task created by `prepare_call`
4454    task: TableId<GuestTask>,
4455    /// The number of lowered core Wasm parameters to pass to the call.
4456    param_count: usize,
4457    /// The `oneshot::Receiver` to which the result of the call will be
4458    /// delivered when it is available.
4459    rx: oneshot::Receiver<LiftedResult>,
4460    _phantom: PhantomData<R>,
4461}
4462
4463impl<R> PreparedCall<R> {
4464    /// Get a copy of the `TaskId` for this `PreparedCall`.
4465    pub(crate) fn task_id(&self) -> TaskId {
4466        TaskId {
4467            handle: self.handle,
4468            task: self.task,
4469        }
4470    }
4471}
4472
4473/// Represents a task created by `prepare_call`.
4474pub(crate) struct TaskId {
4475    handle: Func,
4476    task: TableId<GuestTask>,
4477}
4478
4479impl TaskId {
4480    /// Remove the specified task from the concurrent state to which it belongs.
4481    ///
4482    /// This must be used with care to avoid use-after-delete or double-delete
4483    /// bugs.  Specifically, it should only be called on tasks created with the
4484    /// `remove_task_automatically` parameter to `prepare_call` set to `false`,
4485    /// which tells the runtime that the caller is responsible for removing the
4486    /// task from the state; otherwise, it will be removed automatically.  Also,
4487    /// it should only be called once for a given task, and only after either
4488    /// the task has completed or the instance has trapped.
4489    pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4490        Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
4491    }
4492}
4493
4494/// Prepare a call to the specified exported Wasm function, providing functions
4495/// for lowering the parameters and lifting the result.
4496///
4497/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4498/// loop, use `queue_call`.
4499pub(crate) fn prepare_call<T, R>(
4500    mut store: StoreContextMut<T>,
4501    handle: Func,
4502    param_count: usize,
4503    remove_task_automatically: bool,
4504    call_post_return_automatically: bool,
4505    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4506    + Send
4507    + Sync
4508    + 'static,
4509    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4510    + Send
4511    + Sync
4512    + 'static,
4513) -> Result<PreparedCall<R>> {
4514    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4515
4516    let instance = handle.instance().id().get(store.0);
4517    let task_return_type = instance.component().types()[ty].results;
4518    let component_instance = raw_options.instance;
4519    let callback = options.callback();
4520    let memory = options.memory_raw().map(SendSyncPtr::new);
4521    let string_encoding = options.string_encoding();
4522    let token = StoreToken::new(store.as_context_mut());
4523    let state = handle.instance().concurrent_state_mut(store.0);
4524
4525    assert!(state.guest_task.is_none());
4526
4527    let (tx, rx) = oneshot::channel();
4528
4529    let mut task = GuestTask::new(
4530        state,
4531        Box::new(for_any_lower(move |store, instance, params| {
4532            debug_assert!(instance.id() == handle.instance().id());
4533            lower_params(handle, token.as_context_mut(store), params)
4534        })),
4535        LiftResult {
4536            lift: Box::new(for_any_lift(move |store, instance, result| {
4537                debug_assert!(instance.id() == handle.instance().id());
4538                lift_result(handle, store, result)
4539            })),
4540            ty: task_return_type,
4541            memory,
4542            string_encoding,
4543        },
4544        Caller::Host {
4545            tx: Some(tx),
4546            remove_task_automatically,
4547            call_post_return_automatically,
4548        },
4549        callback.map(|callback| {
4550            let callback = SendSyncPtr::new(callback);
4551            Box::new(
4552                move |store: &mut dyn VMStore,
4553                      instance: Instance,
4554                      runtime_instance,
4555                      event,
4556                      handle| {
4557                    let store = token.as_context_mut(store);
4558                    // SAFETY: Per the contract of `prepare_call`, the callback
4559                    // will remain valid at least as long is this task exists.
4560                    unsafe {
4561                        instance.call_callback(
4562                            store,
4563                            runtime_instance,
4564                            callback,
4565                            event,
4566                            handle,
4567                            call_post_return_automatically,
4568                        )
4569                    }
4570                },
4571            ) as CallbackFn
4572        }),
4573        component_instance,
4574    )?;
4575    task.function_index = Some(handle.index());
4576
4577    let task = state.push(task)?;
4578
4579    Ok(PreparedCall {
4580        handle,
4581        task,
4582        param_count,
4583        rx,
4584        _phantom: PhantomData,
4585    })
4586}
4587
4588/// Queue a call previously prepared using `prepare_call` to be run as part of
4589/// the associated `ComponentInstance`'s event loop.
4590///
4591/// The returned future will resolve to the result once it is available, but
4592/// must only be polled via the instance's event loop. See
4593/// `Instance::run_concurrent` for details.
4594pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4595    mut store: StoreContextMut<T>,
4596    prepared: PreparedCall<R>,
4597) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
4598    let PreparedCall {
4599        handle,
4600        task,
4601        param_count,
4602        rx,
4603        ..
4604    } = prepared;
4605
4606    queue_call0(store.as_context_mut(), handle, task, param_count)?;
4607
4608    Ok(checked(
4609        handle.instance(),
4610        rx.map(|result| {
4611            result
4612                .map(|v| *v.downcast().unwrap())
4613                .map_err(anyhow::Error::from)
4614        }),
4615    ))
4616}
4617
4618/// Queue a call previously prepared using `prepare_call` to be run as part of
4619/// the associated `ComponentInstance`'s event loop.
4620fn queue_call0<T: 'static>(
4621    store: StoreContextMut<T>,
4622    handle: Func,
4623    guest_task: TableId<GuestTask>,
4624    param_count: usize,
4625) -> Result<()> {
4626    let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4627    let is_concurrent = raw_options.async_;
4628    let instance = handle.instance();
4629    let callee = handle.lifted_core_func(store.0);
4630    let callback = options.callback();
4631    let post_return = handle.post_return_core_func(store.0);
4632
4633    log::trace!("queueing call {guest_task:?}");
4634
4635    let instance_flags = if callback.is_none() {
4636        None
4637    } else {
4638        Some(flags)
4639    };
4640
4641    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
4642    // (with signatures appropriate for this call) and will remain valid as
4643    // long as this instance is valid.
4644    unsafe {
4645        instance.queue_call(
4646            store,
4647            guest_task,
4648            SendSyncPtr::new(callee),
4649            param_count,
4650            1,
4651            instance_flags,
4652            is_concurrent,
4653            callback.map(SendSyncPtr::new),
4654            post_return.map(SendSyncPtr::new),
4655        )
4656    }
4657}