wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `ComponentInstance` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `Instance::run_concurrent`.  The `ComponentInstance::poll_until`
14//! function contains the loop itself, while the
15//! `ComponentInstance::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `Instance::run_concurrent`: Run the event loop for the specified instance,
25//! allowing any and all tasks belonging to that instance to make progress.
26//!
27//! - `Instance::spawn`: Run a background task as part of the event loop for the
28//! specified instance.
29//!
30//! - `Instance::{future,stream}`: Create a new Component Model `future` or
31//! `stream`; the read end may be passed to the guest.
32//!
33//! - `{Future,Stream}Reader::read` and `{Future,Stream}Writer::write`: read
34//! from or write to a future or stream, respectively.
35//!
36//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
37//!
38//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
39//! function with the linker.  That function will take an `Accessor` as its
40//! first parameter, which provides access to the store and instance between
41//! (but not across) await points.
42//!
43//! - `Accessor::with`: Access the store, its associated data, and the current
44//! instance.
45//!
46//! - `Accessor::spawn`: Run a background task as part of the event loop for the
47//! specified instance.  This is equivalent to `Instance::spawn` but more
48//! convenient to use in host functions.
49
50use crate::component::func::{self, Func, Options};
51use crate::component::{Component, ComponentInstanceId, HasData, HasSelf, Instance};
52use crate::fiber::{self, StoreFiber, StoreFiberYield};
53use crate::store::{StoreInner, StoreOpaque, StoreToken};
54use crate::vm::component::{
55    CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
56};
57use crate::vm::{SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
58use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
59use anyhow::{Context as _, Result, anyhow, bail};
60use error_contexts::GlobalErrorContextRefCount;
61use futures::channel::oneshot;
62use futures::future::{self, Either, FutureExt};
63use futures::stream::{FuturesUnordered, StreamExt};
64use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
65use std::any::Any;
66use std::borrow::ToOwned;
67use std::boxed::Box;
68use std::cell::UnsafeCell;
69use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
70use std::fmt;
71use std::future::Future;
72use std::marker::PhantomData;
73use std::mem::{self, ManuallyDrop, MaybeUninit};
74use std::ops::DerefMut;
75use std::pin::{Pin, pin};
76use std::ptr::{self, NonNull};
77use std::slice;
78use std::sync::Mutex;
79use std::task::{Context, Poll, Waker};
80use std::vec::Vec;
81use table::{Table, TableDebug, TableError, TableId};
82use wasmtime_environ::component::{
83    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
84    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
85    RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
86    TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
87    TypeTupleIndex,
88};
89
90pub use abort::JoinHandle;
91pub use futures_and_streams::{
92    ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter,
93    GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer,
94    WriteBuffer,
95};
96pub(crate) use futures_and_streams::{
97    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
98};
99
100mod abort;
101mod error_contexts;
102mod futures_and_streams;
103mod table;
104pub(crate) mod tls;
105
106/// Constant defined in the Component Model spec to indicate that the async
107/// intrinsic (e.g. `future.write`) has not yet completed.
108const BLOCKED: u32 = 0xffff_ffff;
109
110/// Corresponds to `CallState` in the upstream spec.
111#[derive(Clone, Copy, Eq, PartialEq, Debug)]
112pub enum Status {
113    Starting = 0,
114    Started = 1,
115    Returned = 2,
116    StartCancelled = 3,
117    ReturnCancelled = 4,
118}
119
120impl Status {
121    /// Packs this status and the optional `waitable` provided into a 32-bit
122    /// result that the canonical ABI requires.
123    ///
124    /// The low 4 bits are reserved for the status while the upper 28 bits are
125    /// the waitable, if present.
126    pub fn pack(self, waitable: Option<u32>) -> u32 {
127        assert!(matches!(self, Status::Returned) == waitable.is_none());
128        let waitable = waitable.unwrap_or(0);
129        assert!(waitable < (1 << 28));
130        (waitable << 4) | (self as u32)
131    }
132}
133
134/// Corresponds to `EventCode` in the Component Model spec, plus related payload
135/// data.
136#[derive(Clone, Copy, Debug)]
137enum Event {
138    None,
139    Cancelled,
140    Subtask {
141        status: Status,
142    },
143    StreamRead {
144        code: ReturnCode,
145        pending: Option<(TypeStreamTableIndex, u32)>,
146    },
147    StreamWrite {
148        code: ReturnCode,
149        pending: Option<(TypeStreamTableIndex, u32)>,
150    },
151    FutureRead {
152        code: ReturnCode,
153        pending: Option<(TypeFutureTableIndex, u32)>,
154    },
155    FutureWrite {
156        code: ReturnCode,
157        pending: Option<(TypeFutureTableIndex, u32)>,
158    },
159}
160
161impl Event {
162    /// Lower this event to core Wasm integers for delivery to the guest.
163    ///
164    /// Note that the waitable handle, if any, is assumed to be lowered
165    /// separately.
166    fn parts(self) -> (u32, u32) {
167        const EVENT_NONE: u32 = 0;
168        const EVENT_SUBTASK: u32 = 1;
169        const EVENT_STREAM_READ: u32 = 2;
170        const EVENT_STREAM_WRITE: u32 = 3;
171        const EVENT_FUTURE_READ: u32 = 4;
172        const EVENT_FUTURE_WRITE: u32 = 5;
173        const EVENT_CANCELLED: u32 = 6;
174        match self {
175            Event::None => (EVENT_NONE, 0),
176            Event::Cancelled => (EVENT_CANCELLED, 0),
177            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
178            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
179            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
180            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
181            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
182        }
183    }
184}
185
186/// Corresponds to `CallbackCode` in the spec.
187mod callback_code {
188    pub const EXIT: u32 = 0;
189    pub const YIELD: u32 = 1;
190    pub const WAIT: u32 = 2;
191    pub const POLL: u32 = 3;
192}
193
194/// A flag indicating that the callee is an async-lowered export.
195///
196/// This may be passed to the `async-start` intrinsic from a fused adapter.
197const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
198
199/// Provides access to either store data (via the `get` method) or the store
200/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
201/// instance to which the current host task belongs.
202///
203/// See [`Accessor::with`] for details.
204pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
205    accessor: &'a Accessor<T, D>,
206    store: StoreContextMut<'a, T>,
207}
208
209impl<'a, T, D> Access<'a, T, D>
210where
211    D: HasData + ?Sized,
212    T: 'static,
213{
214    /// Get mutable access to the store data.
215    pub fn data_mut(&mut self) -> &mut T {
216        self.store.data_mut()
217    }
218
219    /// Get mutable access to the store data.
220    pub fn get(&mut self) -> D::Data<'_> {
221        let get_data = self.accessor.get_data;
222        get_data(self.data_mut())
223    }
224
225    /// Spawn a background task.
226    ///
227    /// See [`Accessor::spawn`] for details.
228    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
229    where
230        T: 'static,
231    {
232        self.accessor.instance.unwrap().spawn_with_accessor(
233            self.store.as_context_mut(),
234            self.accessor.clone_for_spawn(),
235            task,
236        )
237    }
238
239    /// Retrieve the component instance of the caller.
240    pub fn instance(&self) -> Instance {
241        self.accessor.instance()
242    }
243}
244
245impl<'a, T, D> AsContext for Access<'a, T, D>
246where
247    D: HasData + ?Sized,
248    T: 'static,
249{
250    type Data = T;
251
252    fn as_context(&self) -> StoreContext<'_, T> {
253        self.store.as_context()
254    }
255}
256
257impl<'a, T, D> AsContextMut for Access<'a, T, D>
258where
259    D: HasData + ?Sized,
260    T: 'static,
261{
262    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
263        self.store.as_context_mut()
264    }
265}
266
267/// Provides scoped mutable access to store data in the context of a concurrent
268/// host task future.
269///
270/// This allows multiple host task futures to execute concurrently and access
271/// the store between (but not across) `await` points.
272///
273/// # Rationale
274///
275/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
276/// `D::Data<'_>`. The problem this is solving, however, is that it does not
277/// literally store these values. The basic problem is that when a concurrent
278/// host future is being polled it has access to `&mut T` (and the whole
279/// `Store`) but when it's not being polled it does not have access to these
280/// values. This reflects how the store is only ever polling one future at a
281/// time so the store is effectively being passed between futures.
282///
283/// Rust's `Future` trait, however, has no means of passing a `Store`
284/// temporarily between futures. The [`Context`](std::task::Context) type does
285/// not have the ability to attach arbitrary information to it at this time.
286/// This type, [`Accessor`], is used to bridge this expressivity gap.
287///
288/// The [`Accessor`] type here represents the ability to acquire, temporarily in
289/// a synchronous manner, the current store. The [`Accessor::with`] function
290/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
291/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
292/// not take an `async` closure as its argument, instead it's a synchronous
293/// closure which must complete during on run of `Future::poll`. This reflects
294/// how the store is temporarily made available while a host future is being
295/// polled.
296///
297/// # Implementation
298///
299/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
300/// this type additionally doesn't even have a lifetime parameter. This is
301/// instead a representation of proof of the ability to acquire these while a
302/// future is being polled. Wasmtime will, when it polls a host future,
303/// configure ambient state such that the `Accessor` that a future closes over
304/// will work and be able to access the store.
305///
306/// This has a number of implications for users such as:
307///
308/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
309///   the lifetime of a single future.
310/// * A futures is expected to, however, close over an `Accessor` and keep it
311///   alive probably for the duration of the entire future.
312/// * Different host futures will be given different `Accessor`s, and that's
313///   intentional.
314/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
315///   alleviates some otherwise required bounds to be written down.
316///
317/// # Using `Accessor` in `Drop`
318///
319/// The methods on `Accessor` are only expected to work in the context of
320/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
321/// host future can be dropped at any time throughout the system and Wasmtime
322/// store context is not necessarily available at that time. It's recommended to
323/// not use `Accessor` methods in anything connected to a `Drop` implementation
324/// as they will panic and have unintended results. If you run into this though
325/// feel free to file an issue on the Wasmtime repository.
326pub struct Accessor<T: 'static, D = HasSelf<T>>
327where
328    D: HasData + ?Sized,
329{
330    token: StoreToken<T>,
331    get_data: fn(&mut T) -> D::Data<'_>,
332    instance: Option<Instance>,
333}
334
335/// A helper trait to take any type of accessor-with-data in functions.
336///
337/// This trait is similar to [`AsContextMut`] except that it's used when
338/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
339/// [`Accessor`] is the main type used in concurrent settings and is passed to
340/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
341///
342/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
343/// this trait. This effectively means that regardless of the `D` in
344/// `Accessor<T, D>` it can still be passed to a function which just needs a
345/// store accessor.
346///
347/// Acquiring an [`Accessor`] can be done through [`Instance::run_concurrent`]
348/// for example or in a host function through
349/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
350pub trait AsAccessor {
351    /// The `T` in `Store<T>` that this accessor refers to.
352    type Data: 'static;
353
354    /// The `D` in `Accessor<T, D>`, or the projection out of
355    /// `Self::Data`.
356    type AccessorData: HasData + ?Sized;
357
358    /// Returns the accessor that this is referring to.
359    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
360}
361
362impl<T: AsAccessor + ?Sized> AsAccessor for &T {
363    type Data = T::Data;
364    type AccessorData = T::AccessorData;
365
366    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
367        T::as_accessor(self)
368    }
369}
370
371impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
372    type Data = T;
373    type AccessorData = D;
374
375    fn as_accessor(&self) -> &Accessor<T, D> {
376        self
377    }
378}
379
380// Note that it is intentional at this time that `Accessor` does not actually
381// store `&mut T` or anything similar. This distinctly enables the `Accessor`
382// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
383// that matter). This is used to ergonomically simplify bindings where the
384// majority of the time `Accessor` is closed over in a future which then needs
385// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
386// you already have to write `T: 'static`...) it helps to avoid this.
387//
388// Note as well that `Accessor` doesn't actually store its data at all. Instead
389// it's more of a "proof" of what can be accessed from TLS. API design around
390// `Accessor` and functions like `Linker::func_wrap_concurrent` are
391// intentionally made to ensure that `Accessor` is ideally only used in the
392// context that TLS variables are actually set. For example host functions are
393// given `&Accessor`, not `Accessor`, and this prevents them from persisting
394// the value outside of a future. Within the future the TLS variables are all
395// guaranteed to be set while the future is being polled.
396//
397// Finally though this is not an ironclad guarantee, but nor does it need to be.
398// The TLS APIs are designed to panic or otherwise model usage where they're
399// called recursively or similar. It's hoped that code cannot be constructed to
400// actually hit this at runtime but this is not a safety requirement at this
401// time.
402const _: () = {
403    const fn assert<T: Send + Sync>() {}
404    assert::<Accessor<UnsafeCell<u32>>>();
405};
406
407impl<T> Accessor<T> {
408    /// Creates a new `Accessor` backed by the specified functions.
409    ///
410    /// - `get`: used to retrieve the store
411    ///
412    /// - `get_data`: used to "project" from the store's associated data to
413    /// another type (e.g. a field of that data or a wrapper around it).
414    ///
415    /// - `spawn`: used to queue spawned background tasks to be run later
416    ///
417    /// - `instance`: used to access the `Instance` to which this `Accessor`
418    /// (and the future which closes over it) belongs
419    pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
420        Self {
421            token,
422            get_data: |x| x,
423            instance,
424        }
425    }
426}
427
428impl<T, D> Accessor<T, D>
429where
430    D: HasData + ?Sized,
431{
432    /// Run the specified closure, passing it mutable access to the store.
433    ///
434    /// This function is one of the main building blocks of the [`Accessor`]
435    /// type. This yields synchronous, blocking, access to store via an
436    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
437    /// providing the ability to access `D` via [`Access::get`]. Note that the
438    /// `fun` here is given only temporary access to the store and `T`/`D`
439    /// meaning that the return value `R` here is not allowed to capture borrows
440    /// into the two. If access is needed to data within `T` or `D` outside of
441    /// this closure then it must be `clone`d out, for example.
442    ///
443    /// # Panics
444    ///
445    /// This function will panic if it is call recursively with any other
446    /// accessor already in scope. For example if `with` is called within `fun`,
447    /// then this function will panic. It is up to the embedder to ensure that
448    /// this does not happen.
449    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
450        tls::get(|vmstore| {
451            fun(Access {
452                store: self.token.as_context_mut(vmstore),
453                accessor: self,
454            })
455        })
456    }
457
458    /// Changes this accessor to access `D2` instead of the current type
459    /// parameter `D`.
460    ///
461    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
462    ///
463    /// Note that this is not a public or recommended API because it's easy to
464    /// cause panics with this by having two `Accessor` values live at the same
465    /// time. The returned `Accessor` does not refer to this `Accessor` meaning
466    /// that both can be used. You could, for example, call `Accessor::with`
467    /// simultaneously on both. That would cause a panic though.
468    ///
469    /// In short while there's nothing unsafe about this it's a footgun. It's
470    /// here for bindings generation where the provided accessor is transformed
471    /// into a new accessor and then this returned accessor is passed to
472    /// implementations.
473    ///
474    /// Note that one possible fix for this would be a lifetime parameter on
475    /// `Accessor` itself so the returned value could borrow from the original
476    /// value (or this could be `self`-by-value instead of `&mut self`) but in
477    /// attempting that it was found to be a bit too onerous in terms of
478    /// plumbing things around without a whole lot of benefit.
479    ///
480    /// In short, this works, but must be treated with care. The current main
481    /// user, bindings generation, treats this with care.
482    #[doc(hidden)]
483    pub fn with_data<D2: HasData>(&self, get_data: fn(&mut T) -> D2::Data<'_>) -> Accessor<T, D2> {
484        Accessor {
485            token: self.token,
486            get_data,
487            instance: self.instance,
488        }
489    }
490
491    /// Spawn a background task which will receive an `&Accessor<T, D>` and
492    /// run concurrently with any other tasks in progress for the current
493    /// instance.
494    ///
495    /// This is particularly useful for host functions which return a `stream`
496    /// or `future` such that the code to write to the write end of that
497    /// `stream` or `future` must run after the function returns.
498    ///
499    /// The returned [`JoinHandle`] may be used to cancel the task.
500    ///
501    /// # Panics
502    ///
503    /// Panics if called within a closure provided to the [`Accessor::with`]
504    /// function. This can only be called outside an active invocation of
505    /// [`Accessor::with`].
506    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
507    where
508        T: 'static,
509    {
510        let instance = self.instance.unwrap();
511        let accessor = self.clone_for_spawn();
512        self.with(|mut access| {
513            instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
514        })
515    }
516
517    /// Retrieve the component instance of the caller.
518    pub fn instance(&self) -> Instance {
519        self.instance.unwrap()
520    }
521
522    fn clone_for_spawn(&self) -> Self {
523        Self {
524            token: self.token,
525            get_data: self.get_data,
526            instance: self.instance,
527        }
528    }
529}
530
531/// Represents a task which may be provided to `Accessor::spawn`,
532/// `Accessor::forward`, or `Instance::spawn`.
533// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
534// option.
535//
536// `AsyncFnOnce` is still nightly-only in latest stable Rust version as of this
537// writing (1.84.1), and even with 1.85.0-beta it's not possible to specify
538// e.g. `Send` and `Sync` bounds on the `Future` type returned by an
539// `AsyncFnOnce`.  Also, using `F: Future<Output = Result<()>> + Send + Sync,
540// FN: FnOnce(&Accessor<T>) -> F + Send + Sync + 'static` fails with a type
541// mismatch error when we try to pass it an async closure (e.g. `async move |_|
542// { ... }`).  So this seems to be the best we can do for the time being.
543pub trait AccessorTask<T, D, R>: Send + 'static
544where
545    D: HasData + ?Sized,
546{
547    /// Run the task.
548    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
549}
550
551/// Represents parameter and result metadata for the caller side of a
552/// guest->guest call orchestrated by a fused adapter.
553enum CallerInfo {
554    /// Metadata for a call to an async-lowered import
555    Async {
556        params: Vec<ValRaw>,
557        has_result: bool,
558    },
559    /// Metadata for a call to an sync-lowered import
560    Sync {
561        params: Vec<ValRaw>,
562        result_count: u32,
563    },
564}
565
566/// Indicates how a guest task is waiting on a waitable set.
567enum WaitMode {
568    /// The guest task is waiting using `task.wait`
569    Fiber(StoreFiber<'static>),
570    /// The guest task is waiting via a callback declared as part of an
571    /// async-lifted export.
572    Callback,
573}
574
575/// Represents the reason a fiber is suspending itself.
576#[derive(Debug)]
577enum SuspendReason {
578    /// The fiber is waiting for an event to be delivered to the specified
579    /// waitable set or task.
580    Waiting {
581        set: TableId<WaitableSet>,
582        task: TableId<GuestTask>,
583    },
584    /// The fiber has finished handling its most recent work item and is waiting
585    /// for another (or to be dropped if it is no longer needed).
586    NeedWork,
587    /// The fiber is yielding and should be resumed once other tasks have had a
588    /// chance to run.
589    Yielding { task: TableId<GuestTask> },
590}
591
592/// Represents a pending call into guest code for a given guest task.
593enum GuestCallKind {
594    /// Indicates there's an event to deliver to the task, possibly related to a
595    /// waitable set the task has been waiting on or polling.
596    DeliverEvent {
597        /// The waitable set the event belongs to, if any.
598        ///
599        /// If this is `None` the event will be waiting in the
600        /// `GuestTask::event` field for the task.
601        set: Option<TableId<WaitableSet>>,
602    },
603    /// Indicates that a new guest task call is pending and may be executed
604    /// using the specified closure.
605    Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
606}
607
608impl fmt::Debug for GuestCallKind {
609    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
610        match self {
611            Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(),
612            Self::Start(_) => f.debug_tuple("Start").finish(),
613        }
614    }
615}
616
617/// Represents a pending call into guest code for a given guest task.
618#[derive(Debug)]
619struct GuestCall {
620    task: TableId<GuestTask>,
621    kind: GuestCallKind,
622}
623
624impl GuestCall {
625    /// Returns whether or not the call is ready to run.
626    ///
627    /// A call will not be ready to run if either:
628    ///
629    /// - the (sub-)component instance to be called has already been entered and
630    /// cannot be reentered until an in-progress call completes
631    ///
632    /// - the call is for a not-yet started task and the (sub-)component
633    /// instance to be called has backpressure enabled
634    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
635        let task_instance = state.get(self.task)?.instance;
636        let state = state.instance_state(task_instance);
637        let ready = match &self.kind {
638            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
639            GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure),
640        };
641        log::trace!(
642            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
643            state.do_not_enter,
644            state.backpressure
645        );
646        Ok(ready)
647    }
648}
649
650/// Job to be run on a worker fiber.
651enum WorkerItem {
652    GuestCall(GuestCall),
653    Function(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
654}
655
656/// Represents state related to an in-progress poll operation (e.g. `task.poll`
657/// or `CallbackCode.POLL`).
658#[derive(Debug)]
659struct PollParams {
660    /// Identifies the polling task.
661    task: TableId<GuestTask>,
662    /// The waitable set being polled.
663    set: TableId<WaitableSet>,
664}
665
666/// Represents a pending work item to be handled by the event loop for a given
667/// component instance.
668enum WorkItem {
669    /// A host task to be pushed to `ConcurrentState::futures`.
670    PushFuture(Mutex<HostTaskFuture>),
671    /// A fiber to resume.
672    ResumeFiber(StoreFiber<'static>),
673    /// A pending call into guest code for a given guest task.
674    GuestCall(GuestCall),
675    /// A pending `task.poll` or `CallbackCode.POLL` operation.
676    Poll(PollParams),
677    /// A job to run on a worker fiber.
678    WorkerFunction(Mutex<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
679}
680
681impl fmt::Debug for WorkItem {
682    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
683        match self {
684            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
685            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
686            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
687            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
688            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
689        }
690    }
691}
692
693impl ComponentInstance {
694    /// Handle the `CallbackCode` returned from an async-lifted export or its
695    /// callback.
696    ///
697    /// If `initial_call` is `true`, then the code was received from the
698    /// async-lifted export; otherwise, it was received from its callback.
699    fn handle_callback_code(
700        mut self: Pin<&mut Self>,
701        guest_task: TableId<GuestTask>,
702        runtime_instance: RuntimeComponentInstanceIndex,
703        code: u32,
704        initial_call: bool,
705    ) -> Result<()> {
706        let (code, set) = unpack_callback_code(code);
707
708        log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
709
710        let state = self.as_mut().concurrent_state_mut();
711        let task = state.get_mut(guest_task)?;
712
713        if task.lift_result.is_some() {
714            if code == callback_code::EXIT {
715                return Err(anyhow!(crate::Trap::NoAsyncResult));
716            }
717            if initial_call {
718                // Notify any current or future waiters that this subtask has
719                // started.
720                Waitable::Guest(guest_task).set_event(
721                    state,
722                    Some(Event::Subtask {
723                        status: Status::Started,
724                    }),
725                )?;
726            }
727        }
728
729        let get_set = |instance: Pin<&mut Self>, handle| {
730            if handle == 0 {
731                bail!("invalid waitable-set handle");
732            }
733
734            let set = instance.guest_tables().0[runtime_instance].waitable_set_rep(handle)?;
735
736            Ok(TableId::<WaitableSet>::new(set))
737        };
738
739        match code {
740            callback_code::EXIT => {
741                let task = state.get_mut(guest_task)?;
742                match &task.caller {
743                    Caller::Host {
744                        remove_task_automatically,
745                        ..
746                    } => {
747                        if *remove_task_automatically {
748                            log::trace!("handle_callback_code will delete task {guest_task:?}");
749                            Waitable::Guest(guest_task).delete_from(state)?;
750                        }
751                    }
752                    Caller::Guest { .. } => {
753                        task.exited = true;
754                        task.callback = None;
755                    }
756                }
757            }
758            callback_code::YIELD => {
759                // Push this task onto the "low priority" queue so it runs after
760                // any other tasks have had a chance to run.
761                let task = state.get_mut(guest_task)?;
762                assert!(task.event.is_none());
763                task.event = Some(Event::None);
764                state.push_low_priority(WorkItem::GuestCall(GuestCall {
765                    task: guest_task,
766                    kind: GuestCallKind::DeliverEvent { set: None },
767                }));
768            }
769            callback_code::WAIT | callback_code::POLL => {
770                let set = get_set(self.as_mut(), set)?;
771                let state = self.concurrent_state_mut();
772
773                if state.get_mut(guest_task)?.event.is_some()
774                    || !state.get_mut(set)?.ready.is_empty()
775                {
776                    // An event is immediately available; deliver it ASAP.
777                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
778                        task: guest_task,
779                        kind: GuestCallKind::DeliverEvent { set: Some(set) },
780                    }));
781                } else {
782                    // No event is immediately available.
783                    match code {
784                        callback_code::POLL => {
785                            // We're polling, so just yield and check whether an
786                            // event has arrived after that.
787                            state.push_low_priority(WorkItem::Poll(PollParams {
788                                task: guest_task,
789                                set,
790                            }));
791                        }
792                        callback_code::WAIT => {
793                            // We're waiting, so register to be woken up when an
794                            // event is published for this waitable set.
795                            //
796                            // Here we also set `GuestTask::wake_on_cancel`
797                            // which allows `subtask.cancel` to interrupt the
798                            // wait.
799                            let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
800                            assert!(old.is_none());
801                            let old = state
802                                .get_mut(set)?
803                                .waiting
804                                .insert(guest_task, WaitMode::Callback);
805                            assert!(old.is_none());
806                        }
807                        _ => unreachable!(),
808                    }
809                }
810            }
811            _ => bail!("unsupported callback code: {code}"),
812        }
813
814        Ok(())
815    }
816
817    /// Get the next pending event for the specified task and (optional)
818    /// waitable set, along with the waitable handle if applicable.
819    fn get_event(
820        mut self: Pin<&mut Self>,
821        guest_task: TableId<GuestTask>,
822        set: Option<TableId<WaitableSet>>,
823    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
824        let state = self.as_mut().concurrent_state_mut();
825
826        Ok(
827            if let Some(event) = state.get_mut(guest_task)?.event.take() {
828                log::trace!("deliver event {event:?} to {guest_task:?}");
829
830                Some((event, None))
831            } else if let Some((set, waitable)) = set
832                .and_then(|set| {
833                    state
834                        .get_mut(set)
835                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
836                        .transpose()
837                })
838                .transpose()?
839            {
840                let common = waitable.common(state)?;
841                let handle = common.handle.unwrap();
842                let event = common.event.take().unwrap();
843
844                log::trace!(
845                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
846                );
847
848                waitable.on_delivery(self, event);
849
850                Some((event, Some((waitable, handle))))
851            } else {
852                None
853            },
854        )
855    }
856
857    /// Implements the `waitable-set.new` intrinsic.
858    pub(crate) fn waitable_set_new(
859        mut self: Pin<&mut Self>,
860        caller_instance: RuntimeComponentInstanceIndex,
861    ) -> Result<u32> {
862        let set = self
863            .as_mut()
864            .concurrent_state_mut()
865            .push(WaitableSet::default())?;
866        let handle = self.guest_tables().0[caller_instance].waitable_set_insert(set.rep())?;
867        log::trace!("new waitable set {set:?} (handle {handle})");
868        Ok(handle)
869    }
870
871    /// Implements the `waitable-set.drop` intrinsic.
872    pub(crate) fn waitable_set_drop(
873        mut self: Pin<&mut Self>,
874        caller_instance: RuntimeComponentInstanceIndex,
875        set: u32,
876    ) -> Result<()> {
877        let rep = self.as_mut().guest_tables().0[caller_instance].waitable_set_remove(set)?;
878
879        log::trace!("drop waitable set {rep} (handle {set})");
880
881        let set = self
882            .concurrent_state_mut()
883            .delete(TableId::<WaitableSet>::new(rep))?;
884
885        if !set.waiting.is_empty() {
886            bail!("cannot drop waitable set with waiters");
887        }
888
889        Ok(())
890    }
891
892    /// Implements the `waitable.join` intrinsic.
893    pub(crate) fn waitable_join(
894        mut self: Pin<&mut Self>,
895        caller_instance: RuntimeComponentInstanceIndex,
896        waitable_handle: u32,
897        set_handle: u32,
898    ) -> Result<()> {
899        let waitable = Waitable::from_instance(self.as_mut(), caller_instance, waitable_handle)?;
900
901        let set = if set_handle == 0 {
902            None
903        } else {
904            let set =
905                self.as_mut().guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
906
907            Some(TableId::<WaitableSet>::new(set))
908        };
909
910        log::trace!(
911            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
912        );
913
914        waitable.join(self.concurrent_state_mut(), set)
915    }
916
917    /// Implements the `subtask.drop` intrinsic.
918    pub(crate) fn subtask_drop(
919        mut self: Pin<&mut Self>,
920        caller_instance: RuntimeComponentInstanceIndex,
921        task_id: u32,
922    ) -> Result<()> {
923        self.as_mut().waitable_join(caller_instance, task_id, 0)?;
924
925        let (rep, is_host) =
926            self.as_mut().guest_tables().0[caller_instance].subtask_remove(task_id)?;
927
928        let concurrent_state = self.concurrent_state_mut();
929        let (waitable, expected_caller_instance, delete) = if is_host {
930            let id = TableId::<HostTask>::new(rep);
931            let task = concurrent_state.get(id)?;
932            if task.join_handle.is_some() {
933                bail!("cannot drop a subtask which has not yet resolved");
934            }
935            (Waitable::Host(id), task.caller_instance, true)
936        } else {
937            let id = TableId::<GuestTask>::new(rep);
938            let task = concurrent_state.get(id)?;
939            if task.lift_result.is_some() {
940                bail!("cannot drop a subtask which has not yet resolved");
941            }
942            if let Caller::Guest { instance, .. } = &task.caller {
943                (Waitable::Guest(id), *instance, task.exited)
944            } else {
945                unreachable!()
946            }
947        };
948
949        waitable.common(concurrent_state)?.handle = None;
950
951        if waitable.take_event(concurrent_state)?.is_some() {
952            bail!("cannot drop a subtask with an undelivered event");
953        }
954
955        if delete {
956            waitable.delete_from(concurrent_state)?;
957        }
958
959        // Since waitables can neither be passed between instances nor forged,
960        // this should never fail unless there's a bug in Wasmtime, but we check
961        // here to be sure:
962        assert_eq!(expected_caller_instance, caller_instance);
963        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
964        Ok(())
965    }
966}
967
968impl Instance {
969    /// Enable or disable concurrent state debugging mode for e.g. integration
970    /// tests.
971    ///
972    /// This will avoid re-using deleted handles, making it easier to catch
973    /// e.g. "use-after-delete" and "double-delete" errors.  It can also make
974    /// reading trace output easier since it ensures handles are never
975    /// repurposed.
976    #[doc(hidden)]
977    pub fn enable_concurrent_state_debug(&self, mut store: impl AsContextMut, enable: bool) {
978        self.id()
979            .get_mut(store.as_context_mut().0)
980            .concurrent_state_mut()
981            .table
982            .enable_debug(enable);
983        // TODO: do the same for the tables holding guest-facing handles
984    }
985
986    /// Assert that all the relevant tables and queues in the concurrent state
987    /// for this instance are empty.
988    ///
989    /// This is for sanity checking in integration tests
990    /// (e.g. `component-async-tests`) that the relevant state has been cleared
991    /// after each test concludes.  This should help us catch leaks, e.g. guest
992    /// tasks which haven't been deleted despite having completed and having
993    /// been dropped by their supertasks.
994    #[doc(hidden)]
995    pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
996        let mut instance = self.id().get_mut(store.as_context_mut().0);
997        assert!(
998            instance
999                .as_mut()
1000                .guest_tables()
1001                .0
1002                .iter()
1003                .all(|(_, table)| table.is_empty())
1004        );
1005        let state = instance.concurrent_state_mut();
1006        assert!(state.table.is_empty(), "non-empty table: {:?}", state.table);
1007        assert!(state.high_priority.is_empty());
1008        assert!(state.low_priority.is_empty());
1009        assert!(state.guest_task.is_none());
1010        assert!(
1011            state
1012                .futures
1013                .get_mut()
1014                .unwrap()
1015                .as_ref()
1016                .unwrap()
1017                .is_empty()
1018        );
1019        assert!(
1020            state
1021                .instance_states
1022                .iter()
1023                .all(|(_, state)| state.pending.is_empty())
1024        );
1025        assert!(state.global_error_context_ref_counts.is_empty());
1026    }
1027
1028    /// Run the specified closure `fun` to completion as part of this instance's
1029    /// event loop.
1030    ///
1031    /// Like [`Self::run`], this will run `fun` as part of this instance's event
1032    /// loop until it yields a result _or_ there are no more tasks to run.
1033    /// Unlike [`Self::run`], `fun` is provided an [`Accessor`], which provides
1034    /// controlled access to the `Store` and its data.
1035    ///
1036    /// This function can be used to invoke [`Func::call_concurrent`] for
1037    /// example within the async closure provided here.
1038    ///
1039    /// # Example
1040    ///
1041    /// ```
1042    /// # use {
1043    /// #   anyhow::{Result},
1044    /// #   wasmtime::{
1045    /// #     component::{ Component, Linker, Resource, ResourceTable},
1046    /// #     Config, Engine, Store
1047    /// #   },
1048    /// # };
1049    /// #
1050    /// # struct MyResource(u32);
1051    /// # struct Ctx { table: ResourceTable }
1052    /// #
1053    /// # async fn foo() -> Result<()> {
1054    /// # let mut config = Config::new();
1055    /// # let engine = Engine::new(&config)?;
1056    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1057    /// # let mut linker = Linker::new(&engine);
1058    /// # let component = Component::new(&engine, "")?;
1059    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1060    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1061    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1062    /// instance.run_concurrent(&mut store, async |accessor| -> wasmtime::Result<_> {
1063    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1064    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1065    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1066    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1067    ///    Ok(())
1068    /// }).await??;
1069    /// # Ok(())
1070    /// # }
1071    /// ```
1072    pub async fn run_concurrent<T, R>(
1073        self,
1074        mut store: impl AsContextMut<Data = T>,
1075        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1076    ) -> Result<R>
1077    where
1078        T: Send + 'static,
1079    {
1080        check_recursive_run();
1081        let mut store = store.as_context_mut();
1082        let token = StoreToken::new(store.as_context_mut());
1083
1084        struct Dropper<'a, T: 'static, V> {
1085            store: StoreContextMut<'a, T>,
1086            value: ManuallyDrop<V>,
1087        }
1088
1089        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1090            fn drop(&mut self) {
1091                tls::set(self.store.0, || {
1092                    // SAFETY: Here we drop the value without moving it for the
1093                    // first and only time -- per the contract for `Drop::drop`,
1094                    // this code won't run again, and the `value` field will no
1095                    // longer be accessible.
1096                    unsafe { ManuallyDrop::drop(&mut self.value) }
1097                });
1098            }
1099        }
1100
1101        let accessor = &Accessor::new(token, Some(self));
1102        let dropper = &mut Dropper {
1103            store,
1104            value: ManuallyDrop::new(fun(accessor)),
1105        };
1106        // SAFETY: We never move `dropper` nor its `value` field.
1107        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1108
1109        self.poll_until(dropper.store.as_context_mut(), future)
1110            .await
1111    }
1112
1113    /// Spawn a background task to run as part of this instance's event loop.
1114    ///
1115    /// The task will receive an `&Accessor<U>` and run concurrently with
1116    /// any other tasks in progress for the instance.
1117    ///
1118    /// Note that the task will only make progress if and when the event loop
1119    /// for this instance is run.
1120    ///
1121    /// The returned [`SpawnHandle`] may be used to cancel the task.
1122    pub fn spawn<U: 'static>(
1123        self,
1124        mut store: impl AsContextMut<Data = U>,
1125        task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1126    ) -> JoinHandle {
1127        let mut store = store.as_context_mut();
1128        let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
1129        self.spawn_with_accessor(store, accessor, task)
1130    }
1131
1132    /// Internal implementation of `spawn` functions where a `store` is
1133    /// available along with an `Accessor`.
1134    fn spawn_with_accessor<T, D>(
1135        self,
1136        mut store: StoreContextMut<T>,
1137        accessor: Accessor<T, D>,
1138        task: impl AccessorTask<T, D, Result<()>>,
1139    ) -> JoinHandle
1140    where
1141        T: 'static,
1142        D: HasData + ?Sized,
1143    {
1144        let store = store.as_context_mut();
1145
1146        // Create an "abortable future" here where internally the future will
1147        // hook calls to poll and possibly spawn more background tasks on each
1148        // iteration.
1149        let (handle, future) =
1150            JoinHandle::run(async move { HostTaskOutput::Result(task.run(&accessor).await) });
1151        self.concurrent_state_mut(store.0)
1152            .push_future(Box::pin(async move {
1153                future.await.unwrap_or(HostTaskOutput::Result(Ok(())))
1154            }));
1155
1156        handle
1157    }
1158
1159    /// Run this instance's event loop.
1160    ///
1161    /// The returned future will resolve when either the specified future
1162    /// completes (in which case we return its result) or no further progress
1163    /// can be made (in which case we trap with `Trap::AsyncDeadlock`).
1164    async fn poll_until<T, R>(
1165        self,
1166        mut store: StoreContextMut<'_, T>,
1167        mut future: Pin<&mut impl Future<Output = R>>,
1168    ) -> Result<R>
1169    where
1170        T: Send,
1171    {
1172        loop {
1173            // Take `ConcurrentState::futures` out of the instance so we can
1174            // poll it while also safely giving any of the futures inside access
1175            // to `self`.
1176            let mut futures = self
1177                .concurrent_state_mut(store.0)
1178                .futures
1179                .get_mut()
1180                .unwrap()
1181                .take()
1182                .unwrap();
1183            let mut next = pin!(futures.next());
1184
1185            let result = future::poll_fn(|cx| {
1186                // First, poll the future we were passed as an argument and
1187                // return immediately if it's ready.
1188                if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1189                    return Poll::Ready(Ok(Either::Left(value)));
1190                }
1191
1192                // Next, poll `ConcurrentState::futures` (which includes any
1193                // pending host tasks and/or background tasks), returning
1194                // immediately if one of them fails.
1195                let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1196                    Poll::Ready(Some(output)) => {
1197                        match output {
1198                            HostTaskOutput::Result(Err(e)) => return Poll::Ready(Err(e)),
1199                            HostTaskOutput::Result(Ok(())) => {}
1200                            HostTaskOutput::Function(fun) => {
1201                                // Defer calling this function to a worker fiber
1202                                // in case it involves calling a guest realloc
1203                                // function as part of a lowering operation.
1204                                //
1205                                // TODO: This isn't necessary for _all_
1206                                // `HostOutput::Function`s, so we could optimize
1207                                // by adding another variant to `HostOutput` to
1208                                // distinguish which ones need it and which
1209                                // don't.
1210                                self.concurrent_state_mut(store.0)
1211                                    .push_high_priority(WorkItem::WorkerFunction(Mutex::new(fun)))
1212                            }
1213                        }
1214                        Poll::Ready(true)
1215                    }
1216                    Poll::Ready(None) => Poll::Ready(false),
1217                    Poll::Pending => Poll::Pending,
1218                };
1219
1220                let mut instance = self.id().get_mut(store.0);
1221
1222                // Next, check the "high priority" work queue and return
1223                // immediately if it has at least one item.
1224                let state = instance.as_mut().concurrent_state_mut();
1225                let ready = mem::take(&mut state.high_priority);
1226                let ready = if ready.is_empty() {
1227                    // Next, check the "low priority" work queue and return
1228                    // immediately if it has at least one item.
1229                    let ready = mem::take(&mut state.low_priority);
1230                    if ready.is_empty() {
1231                        return match next {
1232                            Poll::Ready(true) => {
1233                                // In this case, one of the futures in
1234                                // `ConcurrentState::futures` completed
1235                                // successfully, so we return now and continue
1236                                // the outer loop in case there is another one
1237                                // ready to complete.
1238                                Poll::Ready(Ok(Either::Right(Vec::new())))
1239                            }
1240                            Poll::Ready(false) => {
1241                                // Poll the future we were passed one last time
1242                                // in case one of `ConcurrentState::futures` had
1243                                // the side effect of unblocking it.
1244                                if let Poll::Ready(value) =
1245                                    self.set_tls(store.0, || future.as_mut().poll(cx))
1246                                {
1247                                    Poll::Ready(Ok(Either::Left(value)))
1248                                } else {
1249                                    // In this case, there are no more pending
1250                                    // futures in `ConcurrentState::futures`,
1251                                    // there are no remaining work items, _and_
1252                                    // the future we were passed as an argument
1253                                    // still hasn't completed, meaning we're
1254                                    // stuck, so we return an error.  The
1255                                    // underlying assumption is that `future`
1256                                    // depends on this component instance making
1257                                    // such progress, and thus there's no point
1258                                    // in continuing to poll it given we've run
1259                                    // out of work to do.
1260                                    //
1261                                    // Note that we'd also reach this point if
1262                                    // the host embedder passed e.g. a
1263                                    // `std::future::Pending` to
1264                                    // `Instance::run_concurrent`, in which case
1265                                    // we'd return a "deadlock" error even when
1266                                    // any and all tasks have completed
1267                                    // normally.  However, that's not how
1268                                    // `Instance::run_concurrent` is intended
1269                                    // (and documented) to be used, so it seems
1270                                    // reasonable to lump that case in with
1271                                    // "real" deadlocks.
1272                                    //
1273                                    // TODO: Once we've added host APIs for
1274                                    // cancelling in-progress tasks, we can
1275                                    // return some other, non-error value here,
1276                                    // treating it as "normal" and giving the
1277                                    // host embedder a chance to intervene by
1278                                    // cancelling one or more tasks and/or
1279                                    // starting new tasks capable of waking the
1280                                    // existing ones.
1281                                    Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1282                                }
1283                            }
1284                            // There is at least one pending future in
1285                            // `ConcurrentState::futures` and we have nothing
1286                            // else to do but wait for now, so we return
1287                            // `Pending`.
1288                            Poll::Pending => Poll::Pending,
1289                        };
1290                    } else {
1291                        ready
1292                    }
1293                } else {
1294                    ready
1295                };
1296
1297                Poll::Ready(Ok(Either::Right(ready)))
1298            })
1299            .await;
1300
1301            // Put the `ConcurrentState::futures` back into the instance before
1302            // we return or handle any work items since one or more of those
1303            // items might append more futures.
1304            *self
1305                .concurrent_state_mut(store.0)
1306                .futures
1307                .get_mut()
1308                .unwrap() = Some(futures);
1309
1310            match result? {
1311                // The future we were passed as an argument completed, so we
1312                // return the result.
1313                Either::Left(value) => break Ok(value),
1314                // The future we were passed has not yet completed, so handle
1315                // any work items and then loop again.
1316                Either::Right(ready) => {
1317                    for item in ready {
1318                        self.handle_work_item(store.as_context_mut(), item).await?;
1319                    }
1320                }
1321            }
1322        }
1323    }
1324
1325    /// Handle the specified work item, possibly resuming a fiber if applicable.
1326    async fn handle_work_item<T: Send>(
1327        self,
1328        store: StoreContextMut<'_, T>,
1329        item: WorkItem,
1330    ) -> Result<()> {
1331        log::trace!("handle work item {item:?}");
1332        match item {
1333            WorkItem::PushFuture(future) => {
1334                self.concurrent_state_mut(store.0)
1335                    .futures
1336                    .get_mut()
1337                    .unwrap()
1338                    .as_mut()
1339                    .unwrap()
1340                    .push(future.into_inner().unwrap());
1341            }
1342            WorkItem::ResumeFiber(fiber) => {
1343                self.resume_fiber(store.0, fiber).await?;
1344            }
1345            WorkItem::GuestCall(call) => {
1346                let state = self.concurrent_state_mut(store.0);
1347                if call.is_ready(state)? {
1348                    self.run_on_worker(store, WorkerItem::GuestCall(call))
1349                        .await?;
1350                } else {
1351                    let task = state.get_mut(call.task)?;
1352                    if !task.starting_sent {
1353                        task.starting_sent = true;
1354                        if let GuestCallKind::Start(_) = &call.kind {
1355                            Waitable::Guest(call.task).set_event(
1356                                state,
1357                                Some(Event::Subtask {
1358                                    status: Status::Starting,
1359                                }),
1360                            )?;
1361                        }
1362                    }
1363
1364                    let runtime_instance = state.get(call.task)?.instance;
1365                    state
1366                        .instance_state(runtime_instance)
1367                        .pending
1368                        .insert(call.task, call.kind);
1369                }
1370            }
1371            WorkItem::Poll(params) => {
1372                let state = self.concurrent_state_mut(store.0);
1373                if state.get_mut(params.task)?.event.is_some()
1374                    || !state.get_mut(params.set)?.ready.is_empty()
1375                {
1376                    // There's at least one event immediately available; deliver
1377                    // it to the guest ASAP.
1378                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1379                        task: params.task,
1380                        kind: GuestCallKind::DeliverEvent {
1381                            set: Some(params.set),
1382                        },
1383                    }));
1384                } else {
1385                    // There are no events immediately available; deliver
1386                    // `Event::None` to the guest.
1387                    state.get_mut(params.task)?.event = Some(Event::None);
1388                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1389                        task: params.task,
1390                        kind: GuestCallKind::DeliverEvent {
1391                            set: Some(params.set),
1392                        },
1393                    }));
1394                }
1395            }
1396            WorkItem::WorkerFunction(fun) => {
1397                self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1398            }
1399        }
1400
1401        Ok(())
1402    }
1403
1404    /// Resume the specified fiber, giving it exclusive access to the specified
1405    /// store.
1406    async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
1407        let old_task = self.concurrent_state_mut(store).guest_task;
1408        log::trace!("resume_fiber: save current task {old_task:?}");
1409
1410        let fiber = fiber::resolve_or_release(store, fiber).await?;
1411
1412        let state = self.concurrent_state_mut(store);
1413
1414        state.guest_task = old_task;
1415        log::trace!("resume_fiber: restore current task {old_task:?}");
1416
1417        if let Some(mut fiber) = fiber {
1418            // See the `SuspendReason` documentation for what each case means.
1419            match state.suspend_reason.take().unwrap() {
1420                SuspendReason::NeedWork => {
1421                    if state.worker.is_none() {
1422                        state.worker = Some(fiber);
1423                    } else {
1424                        fiber.dispose(store);
1425                    }
1426                }
1427                SuspendReason::Yielding { .. } => {
1428                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1429                }
1430                SuspendReason::Waiting { set, task } => {
1431                    let old = state
1432                        .get_mut(set)?
1433                        .waiting
1434                        .insert(task, WaitMode::Fiber(fiber));
1435                    assert!(old.is_none());
1436                }
1437            }
1438        }
1439
1440        Ok(())
1441    }
1442
1443    /// Execute the specified guest call on a worker fiber.
1444    async fn run_on_worker<T: Send>(
1445        self,
1446        store: StoreContextMut<'_, T>,
1447        item: WorkerItem,
1448    ) -> Result<()> {
1449        let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() {
1450            fiber
1451        } else {
1452            fiber::make_fiber(store.0, move |store| {
1453                loop {
1454                    match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1455                        WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1456                        WorkerItem::Function(fun) => fun.into_inner().unwrap()(store, self)?,
1457                    }
1458
1459                    self.suspend(store, SuspendReason::NeedWork)?;
1460                }
1461            })?
1462        };
1463
1464        let worker_item = &mut self.concurrent_state_mut(store.0).worker_item;
1465        assert!(worker_item.is_none());
1466        *worker_item = Some(item);
1467
1468        self.resume_fiber(store.0, worker).await
1469    }
1470
1471    /// Execute the specified guest call.
1472    fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
1473        match call.kind {
1474            GuestCallKind::DeliverEvent { set } => {
1475                let (event, waitable) =
1476                    self.id().get_mut(store).get_event(call.task, set)?.unwrap();
1477                let state = self.concurrent_state_mut(store);
1478                let task = state.get_mut(call.task)?;
1479                let runtime_instance = task.instance;
1480                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
1481
1482                log::trace!(
1483                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
1484                    call.task,
1485                );
1486
1487                let old_task = state.guest_task.replace(call.task);
1488                log::trace!(
1489                    "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
1490                    call.task
1491                );
1492
1493                self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
1494
1495                let state = self.concurrent_state_mut(store);
1496                state.enter_instance(runtime_instance);
1497
1498                let callback = state.get_mut(call.task)?.callback.take().unwrap();
1499
1500                let code = callback(store, self, runtime_instance, event, handle)?;
1501
1502                let state = self.concurrent_state_mut(store);
1503
1504                state.get_mut(call.task)?.callback = Some(callback);
1505
1506                state.exit_instance(runtime_instance)?;
1507
1508                self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
1509
1510                self.id().get_mut(store).handle_callback_code(
1511                    call.task,
1512                    runtime_instance,
1513                    code,
1514                    false,
1515                )?;
1516
1517                self.concurrent_state_mut(store).guest_task = old_task;
1518                log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
1519            }
1520            GuestCallKind::Start(fun) => {
1521                fun(store, self)?;
1522            }
1523        }
1524
1525        Ok(())
1526    }
1527
1528    /// Suspend the current fiber, storing the reason in
1529    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1530    /// it should be resumed.
1531    ///
1532    /// See the `SuspendReason` documentation for details.
1533    fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
1534        log::trace!("suspend fiber: {reason:?}");
1535
1536        // If we're yielding or waiting on behalf of a guest task, we'll need to
1537        // pop the call context which manages resource borrows before suspending
1538        // and then push it again once we've resumed.
1539        let task = match &reason {
1540            SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1541            SuspendReason::NeedWork => None,
1542        };
1543
1544        let old_guest_task = if let Some(task) = task {
1545            self.maybe_pop_call_context(store, task)?;
1546            self.concurrent_state_mut(store).guest_task
1547        } else {
1548            None
1549        };
1550
1551        let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
1552        assert!(suspend_reason.is_none());
1553        *suspend_reason = Some(reason);
1554
1555        store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1556
1557        if let Some(task) = task {
1558            self.concurrent_state_mut(store).guest_task = old_guest_task;
1559            self.maybe_push_call_context(store, task)?;
1560        }
1561
1562        Ok(())
1563    }
1564
1565    /// Push the call context for managing resource borrows for the specified
1566    /// guest task if it has not yet either returned a result or cancelled
1567    /// itself.
1568    fn maybe_push_call_context(
1569        self,
1570        store: &mut StoreOpaque,
1571        guest_task: TableId<GuestTask>,
1572    ) -> Result<()> {
1573        let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
1574        if task.lift_result.is_some() {
1575            log::trace!("push call context for {guest_task:?}");
1576            let call_context = task.call_context.take().unwrap();
1577            store.component_resource_state().0.push(call_context);
1578        }
1579        Ok(())
1580    }
1581
1582    /// Pop the call context for managing resource borrows for the specified
1583    /// guest task if it has not yet either returned a result or cancelled
1584    /// itself.
1585    fn maybe_pop_call_context(
1586        self,
1587        store: &mut StoreOpaque,
1588        guest_task: TableId<GuestTask>,
1589    ) -> Result<()> {
1590        if self
1591            .concurrent_state_mut(store)
1592            .get(guest_task)?
1593            .lift_result
1594            .is_some()
1595        {
1596            log::trace!("pop call context for {guest_task:?}");
1597            let call_context = Some(store.component_resource_state().0.pop().unwrap());
1598            self.concurrent_state_mut(store)
1599                .get_mut(guest_task)?
1600                .call_context = call_context;
1601        }
1602        Ok(())
1603    }
1604
1605    /// Add the specified guest call to the "high priority" work item queue, to
1606    /// be started as soon as backpressure and/or reentrance rules allow.
1607    ///
1608    /// SAFETY: The raw pointer arguments must be valid references to guest
1609    /// functions (with the appropriate signatures) when the closures queued by
1610    /// this function are called.
1611    unsafe fn queue_call<T: 'static>(
1612        self,
1613        mut store: StoreContextMut<T>,
1614        guest_task: TableId<GuestTask>,
1615        callee: SendSyncPtr<VMFuncRef>,
1616        param_count: usize,
1617        result_count: usize,
1618        flags: Option<InstanceFlags>,
1619        async_: bool,
1620        callback: Option<SendSyncPtr<VMFuncRef>>,
1621        post_return: Option<SendSyncPtr<VMFuncRef>>,
1622    ) -> Result<()> {
1623        /// Return a closure which will call the specified function in the scope
1624        /// of the specified task.
1625        ///
1626        /// This will use `GuestTask::lower_params` to lower the parameters, but
1627        /// will not lift the result; instead, it returns a
1628        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1629        /// any, may be lifted.  Note that an async-lifted export will have
1630        /// returned its result using the `task.return` intrinsic (or not
1631        /// returned a result at all, in the case of `task.cancel`), in which
1632        /// case the "result" of this call will either be a callback code or
1633        /// nothing.
1634        ///
1635        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1636        /// the returned closure is called.
1637        unsafe fn make_call<T: 'static>(
1638            store: StoreContextMut<T>,
1639            guest_task: TableId<GuestTask>,
1640            callee: SendSyncPtr<VMFuncRef>,
1641            param_count: usize,
1642            result_count: usize,
1643            flags: Option<InstanceFlags>,
1644        ) -> impl FnOnce(
1645            &mut dyn VMStore,
1646            Instance,
1647        ) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1648        + Send
1649        + Sync
1650        + 'static
1651        + use<T> {
1652            let token = StoreToken::new(store);
1653            move |store: &mut dyn VMStore, instance: Instance| {
1654                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1655                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1656                let may_enter_after_call = task.call_post_return_automatically();
1657                let lower = task.lower_params.take().unwrap();
1658
1659                lower(store, instance, &mut storage[..param_count])?;
1660
1661                let mut store = token.as_context_mut(store);
1662
1663                // SAFETY: Per the contract documented in `make_call's`
1664                // documentation, `callee` must be a valid pointer.
1665                unsafe {
1666                    if let Some(mut flags) = flags {
1667                        flags.set_may_enter(false);
1668                    }
1669                    crate::Func::call_unchecked_raw(
1670                        &mut store,
1671                        callee.as_non_null(),
1672                        NonNull::new(
1673                            &mut storage[..param_count.max(result_count)]
1674                                as *mut [MaybeUninit<ValRaw>] as _,
1675                        )
1676                        .unwrap(),
1677                    )?;
1678                    if let Some(mut flags) = flags {
1679                        flags.set_may_enter(may_enter_after_call);
1680                    }
1681                }
1682
1683                Ok(storage)
1684            }
1685        }
1686
1687        // SAFETY: Per the contract described in this function documentation,
1688        // the `callee` pointer which `call` closes over must be valid when
1689        // called by the closure we queue below.
1690        let call = unsafe {
1691            make_call(
1692                store.as_context_mut(),
1693                guest_task,
1694                callee,
1695                param_count,
1696                result_count,
1697                flags,
1698            )
1699        };
1700
1701        let callee_instance = self.concurrent_state_mut(store.0).get(guest_task)?.instance;
1702        let fun = if callback.is_some() {
1703            assert!(async_);
1704
1705            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1706                let old_task = instance
1707                    .concurrent_state_mut(store)
1708                    .guest_task
1709                    .replace(guest_task);
1710                log::trace!(
1711                    "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1712                );
1713
1714                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1715
1716                instance
1717                    .concurrent_state_mut(store)
1718                    .enter_instance(callee_instance);
1719
1720                // SAFETY: See the documentation for `make_call` to review the
1721                // contract we must uphold for `call` here.
1722                //
1723                // Per the contract described in the `queue_call`
1724                // documentation, the `callee` pointer which `call` closes
1725                // over must be valid.
1726                let storage = call(store, instance)?;
1727
1728                instance
1729                    .concurrent_state_mut(store)
1730                    .exit_instance(callee_instance)?;
1731
1732                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1733
1734                let state = instance.concurrent_state_mut(store);
1735                state.guest_task = old_task;
1736                log::trace!("stackless call: restored {old_task:?} as current task");
1737
1738                // SAFETY: `wasmparser` will have validated that the callback
1739                // function returns a `i32` result.
1740                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1741
1742                instance.id().get_mut(store).handle_callback_code(
1743                    guest_task,
1744                    callee_instance,
1745                    code,
1746                    true,
1747                )?;
1748
1749                Ok(())
1750            })
1751                as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
1752        } else {
1753            let token = StoreToken::new(store.as_context_mut());
1754            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1755                let old_task = instance
1756                    .concurrent_state_mut(store)
1757                    .guest_task
1758                    .replace(guest_task);
1759                log::trace!(
1760                    "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1761                );
1762
1763                let mut flags = instance.id().get(store).instance_flags(callee_instance);
1764
1765                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1766
1767                // Unless this is a callback-less (i.e. stackful)
1768                // async-lifted export, we need to record that the instance
1769                // cannot be entered until the call returns.
1770                if !async_ {
1771                    instance
1772                        .concurrent_state_mut(store)
1773                        .enter_instance(callee_instance);
1774                }
1775
1776                // SAFETY: See the documentation for `make_call` to review the
1777                // contract we must uphold for `call` here.
1778                //
1779                // Per the contract described in the `queue_call`
1780                // documentation, the `callee` pointer which `call` closes
1781                // over must be valid.
1782                let storage = call(store, instance)?;
1783
1784                if async_ {
1785                    // This is a callback-less (i.e. stackful) async-lifted
1786                    // export, so there is no post-return function, and
1787                    // either `task.return` or `task.cancel` should have
1788                    // been called.
1789                    if instance
1790                        .concurrent_state_mut(store)
1791                        .get(guest_task)?
1792                        .lift_result
1793                        .is_some()
1794                    {
1795                        return Err(anyhow!(crate::Trap::NoAsyncResult));
1796                    }
1797                } else {
1798                    // This is a sync-lifted export, so now is when we lift the
1799                    // result, optionally call the post-return function, if any,
1800                    // and finally notify any current or future waiters that the
1801                    // subtask has returned.
1802
1803                    let lift = {
1804                        let state = instance.concurrent_state_mut(store);
1805                        state.exit_instance(callee_instance)?;
1806
1807                        assert!(state.get(guest_task)?.result.is_none());
1808
1809                        state.get_mut(guest_task)?.lift_result.take().unwrap()
1810                    };
1811
1812                    // SAFETY: `result_count` represents the number of core Wasm
1813                    // results returned, per `wasmparser`.
1814                    let result = (lift.lift)(store, instance, unsafe {
1815                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1816                            &storage[..result_count],
1817                        )
1818                    })?;
1819
1820                    let post_return_arg = match result_count {
1821                        0 => ValRaw::i32(0),
1822                        // SAFETY: `result_count` represents the number of
1823                        // core Wasm results returned, per `wasmparser`.
1824                        1 => unsafe { storage[0].assume_init() },
1825                        _ => unreachable!(),
1826                    };
1827
1828                    if instance
1829                        .concurrent_state_mut(store)
1830                        .get(guest_task)?
1831                        .call_post_return_automatically()
1832                    {
1833                        unsafe { flags.set_needs_post_return(false) }
1834
1835                        if let Some(func) = post_return {
1836                            let mut store = token.as_context_mut(store);
1837
1838                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1839                            // either `wasmtime-cranelift`-generated fused adapter
1840                            // code or `component::Options`.  Per `wasmparser`
1841                            // post-return signature validation, we know it takes a
1842                            // single parameter.
1843                            unsafe {
1844                                crate::Func::call_unchecked_raw(
1845                                    &mut store,
1846                                    func.as_non_null(),
1847                                    slice::from_ref(&post_return_arg).into(),
1848                                )?;
1849                            }
1850                        }
1851
1852                        unsafe { flags.set_may_enter(true) }
1853                    }
1854
1855                    instance.task_complete(
1856                        store,
1857                        guest_task,
1858                        result,
1859                        Status::Returned,
1860                        post_return_arg,
1861                    )?;
1862                }
1863
1864                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1865
1866                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1867
1868                match &task.caller {
1869                    Caller::Host {
1870                        remove_task_automatically,
1871                        ..
1872                    } => {
1873                        if *remove_task_automatically {
1874                            Waitable::Guest(guest_task)
1875                                .delete_from(instance.concurrent_state_mut(store))?;
1876                        }
1877                    }
1878                    Caller::Guest { .. } => {
1879                        task.exited = true;
1880                    }
1881                }
1882
1883                Ok(())
1884            })
1885        };
1886
1887        self.concurrent_state_mut(store.0)
1888            .push_high_priority(WorkItem::GuestCall(GuestCall {
1889                task: guest_task,
1890                kind: GuestCallKind::Start(fun),
1891            }));
1892
1893        Ok(())
1894    }
1895
1896    /// Prepare (but do not start) a guest->guest call.
1897    ///
1898    /// This is called from fused adapter code generated in
1899    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
1900    /// are synthesized Wasm functions which move the parameters from the caller
1901    /// to the callee and the result from the callee to the caller,
1902    /// respectively.  The adapter will call `Self::start_call` immediately
1903    /// after calling this function.
1904    ///
1905    /// SAFETY: All the pointer arguments must be valid pointers to guest
1906    /// entities (and with the expected signatures for the function references
1907    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
1908    unsafe fn prepare_call<T: 'static>(
1909        self,
1910        mut store: StoreContextMut<T>,
1911        start: *mut VMFuncRef,
1912        return_: *mut VMFuncRef,
1913        caller_instance: RuntimeComponentInstanceIndex,
1914        callee_instance: RuntimeComponentInstanceIndex,
1915        task_return_type: TypeTupleIndex,
1916        memory: *mut VMMemoryDefinition,
1917        string_encoding: u8,
1918        caller_info: CallerInfo,
1919    ) -> Result<()> {
1920        enum ResultInfo {
1921            Heap { results: u32 },
1922            Stack { result_count: u32 },
1923        }
1924
1925        let result_info = match &caller_info {
1926            CallerInfo::Async {
1927                has_result: true,
1928                params,
1929            } => ResultInfo::Heap {
1930                results: params.last().unwrap().get_u32(),
1931            },
1932            CallerInfo::Async {
1933                has_result: false, ..
1934            } => ResultInfo::Stack { result_count: 0 },
1935            CallerInfo::Sync {
1936                result_count,
1937                params,
1938            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
1939                results: params.last().unwrap().get_u32(),
1940            },
1941            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
1942                result_count: *result_count,
1943            },
1944        };
1945
1946        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
1947
1948        // Create a new guest task for the call, closing over the `start` and
1949        // `return_` functions to lift the parameters and lower the result,
1950        // respectively.
1951        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
1952        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
1953        let token = StoreToken::new(store.as_context_mut());
1954        let state = self.concurrent_state_mut(store.0);
1955        let old_task = state.guest_task.take();
1956        let new_task = GuestTask::new(
1957            state,
1958            Box::new(move |store, instance, dst| {
1959                let mut store = token.as_context_mut(store);
1960                assert!(dst.len() <= MAX_FLAT_PARAMS);
1961                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1962                let count = match caller_info {
1963                    // Async callers, if they have a result, use the last
1964                    // parameter as a return pointer so chop that off if
1965                    // relevant here.
1966                    CallerInfo::Async { params, has_result } => {
1967                        let params = &params[..params.len() - usize::from(has_result)];
1968                        for (param, src) in params.iter().zip(&mut src) {
1969                            src.write(*param);
1970                        }
1971                        params.len()
1972                    }
1973
1974                    // Sync callers forward everything directly.
1975                    CallerInfo::Sync { params, .. } => {
1976                        for (param, src) in params.iter().zip(&mut src) {
1977                            src.write(*param);
1978                        }
1979                        params.len()
1980                    }
1981                };
1982                // SAFETY: `start` is a valid `*mut VMFuncRef` from
1983                // `wasmtime-cranelift`-generated fused adapter code.  Based on
1984                // how it was constructed (see
1985                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
1986                // for details) we know it takes count parameters and returns
1987                // `dst.len()` results.
1988                unsafe {
1989                    crate::Func::call_unchecked_raw(
1990                        &mut store,
1991                        start.as_non_null(),
1992                        NonNull::new(
1993                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
1994                        )
1995                        .unwrap(),
1996                    )?;
1997                }
1998                dst.copy_from_slice(&src[..dst.len()]);
1999                let state = instance.concurrent_state_mut(store.0);
2000                let task = state.guest_task.unwrap();
2001                Waitable::Guest(task).set_event(
2002                    state,
2003                    Some(Event::Subtask {
2004                        status: Status::Started,
2005                    }),
2006                )?;
2007                Ok(())
2008            }),
2009            LiftResult {
2010                lift: Box::new(move |store, instance, src| {
2011                    // SAFETY: See comment in closure passed as `lower_params`
2012                    // parameter above.
2013                    let mut store = token.as_context_mut(store);
2014                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2015                    if let ResultInfo::Heap { results } = &result_info {
2016                        my_src.push(ValRaw::u32(*results));
2017                    }
2018                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2019                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2020                    // on how it was constructed (see
2021                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2022                    // for details) we know it takes `src.len()` parameters and
2023                    // returns up to 1 result.
2024                    unsafe {
2025                        crate::Func::call_unchecked_raw(
2026                            &mut store,
2027                            return_.as_non_null(),
2028                            my_src.as_mut_slice().into(),
2029                        )?;
2030                    }
2031                    let state = instance.concurrent_state_mut(store.0);
2032                    let task = state.guest_task.unwrap();
2033                    if sync_caller {
2034                        state.get_mut(task)?.sync_result =
2035                            Some(if let ResultInfo::Stack { result_count } = &result_info {
2036                                match result_count {
2037                                    0 => None,
2038                                    1 => Some(my_src[0]),
2039                                    _ => unreachable!(),
2040                                }
2041                            } else {
2042                                None
2043                            });
2044                    }
2045                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2046                }),
2047                ty: task_return_type,
2048                memory: NonNull::new(memory).map(SendSyncPtr::new),
2049                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2050            },
2051            Caller::Guest {
2052                task: old_task.unwrap(),
2053                instance: caller_instance,
2054            },
2055            None,
2056            callee_instance,
2057        )?;
2058
2059        let guest_task = state.push(new_task)?;
2060
2061        if let Some(old_task) = old_task {
2062            if !state.may_enter(guest_task) {
2063                bail!(crate::Trap::CannotEnterComponent);
2064            }
2065
2066            state.get_mut(old_task)?.subtasks.insert(guest_task);
2067        };
2068
2069        // Make the new task the current one so that `Self::start_call` knows
2070        // which one to start.
2071        state.guest_task = Some(guest_task);
2072        log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2073
2074        Ok(())
2075    }
2076
2077    /// Call the specified callback function for an async-lifted export.
2078    ///
2079    /// SAFETY: `function` must be a valid reference to a guest function of the
2080    /// correct signature for a callback.
2081    unsafe fn call_callback<T>(
2082        self,
2083        mut store: StoreContextMut<T>,
2084        callee_instance: RuntimeComponentInstanceIndex,
2085        function: SendSyncPtr<VMFuncRef>,
2086        event: Event,
2087        handle: u32,
2088        may_enter_after_call: bool,
2089    ) -> Result<u32> {
2090        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2091
2092        let (ordinal, result) = event.parts();
2093        let params = &mut [
2094            ValRaw::u32(ordinal),
2095            ValRaw::u32(handle),
2096            ValRaw::u32(result),
2097        ];
2098        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2099        // `wasmtime-cranelift`-generated fused adapter code or
2100        // `component::Options`.  Per `wasmparser` callback signature
2101        // validation, we know it takes three parameters and returns one.
2102        unsafe {
2103            flags.set_may_enter(false);
2104            crate::Func::call_unchecked_raw(
2105                &mut store,
2106                function.as_non_null(),
2107                params.as_mut_slice().into(),
2108            )?;
2109            flags.set_may_enter(may_enter_after_call);
2110        }
2111        Ok(params[0].get_u32())
2112    }
2113
2114    /// Start a guest->guest call previously prepared using
2115    /// `Self::prepare_call`.
2116    ///
2117    /// This is called from fused adapter code generated in
2118    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2119    /// this function immediately after calling `Self::prepare_call`.
2120    ///
2121    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2122    /// functions with the appropriate signatures for the current guest task.
2123    /// If this is a call to an async-lowered import, the actual call may be
2124    /// deferred and run after this function returns, in which case the pointer
2125    /// arguments must also be valid when the call happens.
2126    unsafe fn start_call<T: 'static>(
2127        self,
2128        mut store: StoreContextMut<T>,
2129        callback: *mut VMFuncRef,
2130        post_return: *mut VMFuncRef,
2131        callee: *mut VMFuncRef,
2132        param_count: u32,
2133        result_count: u32,
2134        flags: u32,
2135        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2136    ) -> Result<u32> {
2137        let token = StoreToken::new(store.as_context_mut());
2138        let async_caller = storage.is_none();
2139        let state = self.concurrent_state_mut(store.0);
2140        let guest_task = state.guest_task.unwrap();
2141        let may_enter_after_call = state.get(guest_task)?.call_post_return_automatically();
2142        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2143        let param_count = usize::try_from(param_count).unwrap();
2144        assert!(param_count <= MAX_FLAT_PARAMS);
2145        let result_count = usize::try_from(result_count).unwrap();
2146        assert!(result_count <= MAX_FLAT_RESULTS);
2147
2148        let task = state.get_mut(guest_task)?;
2149        if !callback.is_null() {
2150            // We're calling an async-lifted export with a callback, so store
2151            // the callback and related context as part of the task so we can
2152            // call it later when needed.
2153            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2154            task.callback = Some(Box::new(
2155                move |store, instance, runtime_instance, event, handle| {
2156                    let store = token.as_context_mut(store);
2157                    unsafe {
2158                        instance.call_callback::<T>(
2159                            store,
2160                            runtime_instance,
2161                            callback,
2162                            event,
2163                            handle,
2164                            may_enter_after_call,
2165                        )
2166                    }
2167                },
2168            ));
2169        }
2170
2171        let Caller::Guest {
2172            task: caller,
2173            instance: runtime_instance,
2174        } = &task.caller
2175        else {
2176            // As of this writing, `start_call` is only used for guest->guest
2177            // calls.
2178            unreachable!()
2179        };
2180        let caller = *caller;
2181        let caller_instance = *runtime_instance;
2182
2183        let callee_instance = task.instance;
2184
2185        let instance_flags = if callback.is_null() {
2186            None
2187        } else {
2188            Some(self.id().get(store.0).instance_flags(callee_instance))
2189        };
2190
2191        // Queue the call as a "high priority" work item.
2192        unsafe {
2193            self.queue_call(
2194                store.as_context_mut(),
2195                guest_task,
2196                callee,
2197                param_count,
2198                result_count,
2199                instance_flags,
2200                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2201                NonNull::new(callback).map(SendSyncPtr::new),
2202                NonNull::new(post_return).map(SendSyncPtr::new),
2203            )?;
2204        }
2205
2206        let state = self.concurrent_state_mut(store.0);
2207
2208        // Use the caller's `GuestTask::sync_call_set` to register interest in
2209        // the subtask...
2210        let guest_waitable = Waitable::Guest(guest_task);
2211        let old_set = guest_waitable.common(state)?.set;
2212        let set = state.get_mut(caller)?.sync_call_set;
2213        guest_waitable.join(state, Some(set))?;
2214
2215        // ... and suspend this fiber temporarily while we wait for it to start.
2216        //
2217        // Note that we _could_ call the callee directly using the current fiber
2218        // rather than suspend this one, but that would make reasoning about the
2219        // event loop more complicated and is probably only worth doing if
2220        // there's a measurable performance benefit.  In addition, it would mean
2221        // blocking the caller if the callee calls a blocking sync-lowered
2222        // import, and as of this writing the spec says we must not do that.
2223        //
2224        // Alternatively, the fused adapter code could be modified to call the
2225        // callee directly without calling a host-provided intrinsic at all (in
2226        // which case it would need to do its own, inline backpressure checks,
2227        // etc.).  Again, we'd want to see a measurable performance benefit
2228        // before committing to such an optimization.  And again, we'd need to
2229        // update the spec to allow that.
2230        let (status, waitable) = loop {
2231            self.suspend(store.0, SuspendReason::Waiting { set, task: caller })?;
2232
2233            let state = self.concurrent_state_mut(store.0);
2234
2235            let event = guest_waitable.take_event(state)?;
2236            let Some(Event::Subtask { status }) = event else {
2237                unreachable!();
2238            };
2239
2240            log::trace!("status {status:?} for {guest_task:?}");
2241
2242            if status == Status::Returned {
2243                // It returned, so we can stop waiting.
2244                break (status, None);
2245            } else if async_caller {
2246                // It hasn't returned yet, but the caller is calling via an
2247                // async-lowered import, so we generate a handle for the task
2248                // waitable and return the status.
2249                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2250                    .subtask_insert_guest(guest_task.rep())?;
2251                self.concurrent_state_mut(store.0)
2252                    .get_mut(guest_task)?
2253                    .common
2254                    .handle = Some(handle);
2255                break (status, Some(handle));
2256            } else {
2257                // The callee hasn't returned yet, and the caller is calling via
2258                // a sync-lowered import, so we loop and keep waiting until the
2259                // callee returns.
2260            }
2261        };
2262
2263        let state = self.concurrent_state_mut(store.0);
2264
2265        guest_waitable.join(state, old_set)?;
2266
2267        if let Some(storage) = storage {
2268            // The caller used a sync-lowered import to call an async-lifted
2269            // export, in which case the result, if any, has been stashed in
2270            // `GuestTask::sync_result`.
2271            if let Some(result) = state.get_mut(guest_task)?.sync_result.take() {
2272                if let Some(result) = result {
2273                    storage[0] = MaybeUninit::new(result);
2274                }
2275
2276                Waitable::Guest(guest_task).delete_from(state)?;
2277            } else {
2278                // This means the callee failed to call either `task.return` or
2279                // `task.cancel` before exiting.
2280                return Err(anyhow!(crate::Trap::NoAsyncResult));
2281            }
2282        }
2283
2284        // Reset the current task to point to the caller as it resumes control.
2285        state.guest_task = Some(caller);
2286        log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2287
2288        Ok(status.pack(waitable))
2289    }
2290
2291    /// Wrap the specified host function in a future which will call it, passing
2292    /// it an `&Accessor<T>`.
2293    ///
2294    /// See the `Accessor` documentation for details.
2295    pub(crate) fn wrap_call<T, F, R>(
2296        self,
2297        store: StoreContextMut<T>,
2298        closure: F,
2299    ) -> impl Future<Output = Result<R>> + 'static
2300    where
2301        T: 'static,
2302        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
2303            + Send
2304            + Sync
2305            + 'static,
2306        R: Send + Sync + 'static,
2307    {
2308        let token = StoreToken::new(store);
2309        async move {
2310            let mut accessor = Accessor::new(token, Some(self));
2311            closure(&mut accessor).await
2312        }
2313    }
2314
2315    /// Poll the specified future once on behalf of a guest->host call using an
2316    /// async-lowered import.
2317    ///
2318    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2319    /// `Pending`, add it to the set of futures to be polled as part of this
2320    /// instance's event loop until it completes, and then return
2321    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2322    ///
2323    /// Whether the future returns `Ready` immediately or later, the `lower`
2324    /// function will be used to lower the result, if any, into the guest caller's
2325    /// stack and linear memory unless the task has been cancelled.
2326    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2327        self,
2328        mut store: StoreContextMut<T>,
2329        future: impl Future<Output = Result<R>> + Send + 'static,
2330        caller_instance: RuntimeComponentInstanceIndex,
2331        lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
2332    ) -> Result<Option<u32>> {
2333        let token = StoreToken::new(store.as_context_mut());
2334        let state = self.concurrent_state_mut(store.0);
2335        let caller = state.guest_task.unwrap();
2336
2337        // Create an abortable future which hooks calls to poll and manages call
2338        // context state for the future.
2339        let (join_handle, future) = JoinHandle::run(async move {
2340            let mut future = pin!(future);
2341            let mut call_context = None;
2342            future::poll_fn(move |cx| {
2343                // Push the call context for managing any resource borrows
2344                // for the task.
2345                tls::get(|store| {
2346                    if let Some(call_context) = call_context.take() {
2347                        token
2348                            .as_context_mut(store)
2349                            .0
2350                            .component_resource_state()
2351                            .0
2352                            .push(call_context);
2353                    }
2354                });
2355
2356                let result = future.as_mut().poll(cx);
2357
2358                if result.is_pending() {
2359                    // Pop the call context for managing any resource
2360                    // borrows for the task.
2361                    tls::get(|store| {
2362                        call_context = Some(
2363                            token
2364                                .as_context_mut(store)
2365                                .0
2366                                .component_resource_state()
2367                                .0
2368                                .pop()
2369                                .unwrap(),
2370                        );
2371                    });
2372                }
2373                result
2374            })
2375            .await
2376        });
2377
2378        // We create a new host task even though it might complete immediately
2379        // (in which case we won't need to pass a waitable back to the guest).
2380        // If it does complete immediately, we'll remove it before we return.
2381        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2382
2383        log::trace!("new host task child of {caller:?}: {task:?}");
2384        let token = StoreToken::new(store.as_context_mut());
2385
2386        // Map the output of the future to a `HostTaskOutput` responsible for
2387        // lowering the result into the guest's stack and memory, as well as
2388        // notifying any waiters that the task returned.
2389        let mut future = Box::pin(async move {
2390            let result = match future.await {
2391                Some(result) => result,
2392                // Task was cancelled; nothing left to do.
2393                None => return HostTaskOutput::Result(Ok(())),
2394            };
2395            HostTaskOutput::Function(Box::new(move |store, instance| {
2396                let mut store = token.as_context_mut(store);
2397                lower(store.as_context_mut(), instance, result?)?;
2398                let state = instance.concurrent_state_mut(store.0);
2399                state.get_mut(task)?.join_handle.take();
2400                Waitable::Host(task).set_event(
2401                    state,
2402                    Some(Event::Subtask {
2403                        status: Status::Returned,
2404                    }),
2405                )?;
2406
2407                Ok(())
2408            }))
2409        });
2410
2411        // Finally, poll the future.  We can use a dummy `Waker` here because
2412        // we'll add the future to `ConcurrentState::futures` and poll it
2413        // automatically from the event loop if it doesn't complete immediately
2414        // here.
2415        let poll = self.set_tls(store.0, || {
2416            future
2417                .as_mut()
2418                .poll(&mut Context::from_waker(&Waker::noop()))
2419        });
2420
2421        Ok(match poll {
2422            Poll::Ready(output) => {
2423                // It finished immediately; lower the result and delete the
2424                // task.
2425                output.consume(store.0, self)?;
2426                log::trace!("delete host task {task:?} (already ready)");
2427                self.concurrent_state_mut(store.0).delete(task)?;
2428                None
2429            }
2430            Poll::Pending => {
2431                // It hasn't finished yet; add the future to
2432                // `ConcurrentState::futures` so it will be polled by the event
2433                // loop and allocate a waitable handle to return to the guest.
2434                self.concurrent_state_mut(store.0).push_future(future);
2435                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2436                    .subtask_insert_host(task.rep())?;
2437                self.concurrent_state_mut(store.0)
2438                    .get_mut(task)?
2439                    .common
2440                    .handle = Some(handle);
2441                log::trace!(
2442                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2443                );
2444                Some(handle)
2445            }
2446        })
2447    }
2448
2449    /// Poll the specified future until it completes on behalf of a guest->host
2450    /// call using a sync-lowered import.
2451    ///
2452    /// This is similar to `Self::first_poll` except it's for sync-lowered
2453    /// imports, meaning we don't need to handle cancellation and we can block
2454    /// the caller until the task completes, at which point the caller can
2455    /// handle lowering the result to the guest's stack and linear memory.
2456    pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
2457        self,
2458        store: &mut dyn VMStore,
2459        future: impl Future<Output = Result<R>> + Send + 'static,
2460        caller_instance: RuntimeComponentInstanceIndex,
2461    ) -> Result<R> {
2462        let state = self.concurrent_state_mut(store);
2463
2464        // If there is no current guest task set, that means the host function
2465        // was registered using e.g. `LinkerInstance::func_wrap`, in which case
2466        // it should complete immediately.
2467        let Some(caller) = state.guest_task else {
2468            return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
2469                Poll::Ready(result) => result,
2470                Poll::Pending => {
2471                    unreachable!()
2472                }
2473            };
2474        };
2475
2476        // Save any existing result stashed in `GuestTask::result` so we can
2477        // replace it with the new result.
2478        let old_result = state
2479            .get_mut(caller)
2480            .with_context(|| format!("bad handle: {caller:?}"))?
2481            .result
2482            .take();
2483
2484        // Add a temporary host task into the table so we can track its
2485        // progress.  Note that we'll never allocate a waitable handle for the
2486        // guest since we're being called synchronously.
2487        let task = state.push(HostTask::new(caller_instance, None))?;
2488
2489        log::trace!("new host task child of {caller:?}: {task:?}");
2490
2491        // Map the output of the future to a `HostTaskOutput` which will take
2492        // care of stashing the result in `GuestTask::result` and resuming this
2493        // fiber when the host task completes.
2494        let mut future = Box::pin(future.map(move |result| {
2495            HostTaskOutput::Function(Box::new(move |store, instance| {
2496                let state = instance.concurrent_state_mut(store);
2497                state.get_mut(caller)?.result = Some(Box::new(result?) as _);
2498
2499                Waitable::Host(task).set_event(
2500                    state,
2501                    Some(Event::Subtask {
2502                        status: Status::Returned,
2503                    }),
2504                )?;
2505
2506                Ok(())
2507            }))
2508        })) as HostTaskFuture;
2509
2510        // Finally, poll the future.  We can use a dummy `Waker` here because
2511        // we'll add the future to `ConcurrentState::futures` and poll it
2512        // automatically from the event loop if it doesn't complete immediately
2513        // here.
2514        let poll = self.set_tls(store, || {
2515            future
2516                .as_mut()
2517                .poll(&mut Context::from_waker(&Waker::noop()))
2518        });
2519
2520        match poll {
2521            Poll::Ready(output) => {
2522                // It completed immediately; run the `HostTaskOutput` function
2523                // to stash the result and delete the task.
2524                output.consume(store, self)?;
2525                log::trace!("delete host task {task:?} (already ready)");
2526                self.concurrent_state_mut(store).delete(task)?;
2527            }
2528            Poll::Pending => {
2529                // It did not complete immediately; add it to
2530                // `ConcurrentState::futures` so it will be polled via the event
2531                // loop, then use `GuestTask::sync_call_set` to wait for the
2532                // task to complete, suspending the current fiber until it does
2533                // so.
2534                let state = self.concurrent_state_mut(store);
2535                state.push_future(future);
2536
2537                let set = state.get_mut(caller)?.sync_call_set;
2538                Waitable::Host(task).join(state, Some(set))?;
2539
2540                self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2541            }
2542        }
2543
2544        // Retrieve and return the result.
2545        Ok(*mem::replace(
2546            &mut self.concurrent_state_mut(store).get_mut(caller)?.result,
2547            old_result,
2548        )
2549        .unwrap()
2550        .downcast()
2551        .unwrap())
2552    }
2553
2554    /// Implements the `task.return` intrinsic, lifting the result for the
2555    /// current guest task.
2556    pub(crate) fn task_return(
2557        self,
2558        store: &mut dyn VMStore,
2559        ty: TypeTupleIndex,
2560        options: OptionsIndex,
2561        storage: &[ValRaw],
2562    ) -> Result<()> {
2563        let state = self.concurrent_state_mut(store);
2564        let CanonicalOptions {
2565            string_encoding,
2566            data_model,
2567            ..
2568        } = *state.options(options);
2569        let guest_task = state.guest_task.unwrap();
2570        let lift = state
2571            .get_mut(guest_task)?
2572            .lift_result
2573            .take()
2574            .ok_or_else(|| {
2575                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2576            })?;
2577        assert!(state.get(guest_task)?.result.is_none());
2578
2579        let invalid = ty != lift.ty
2580            || string_encoding != lift.string_encoding
2581            || match data_model {
2582                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2583                    Some(memory) => {
2584                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2585                        let actual = self.id().get(store).runtime_memory(memory);
2586                        expected != actual
2587                    }
2588                    // Memory not specified, meaning it didn't need to be
2589                    // specified per validation, so not invalid.
2590                    None => false,
2591                },
2592                // Always invalid as this isn't supported.
2593                CanonicalOptionsDataModel::Gc { .. } => true,
2594            };
2595
2596        if invalid {
2597            bail!("invalid `task.return` signature and/or options for current task");
2598        }
2599
2600        log::trace!("task.return for {guest_task:?}");
2601
2602        let result = (lift.lift)(store, self, storage)?;
2603
2604        self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2605    }
2606
2607    /// Implements the `task.cancel` intrinsic.
2608    pub(crate) fn task_cancel(
2609        self,
2610        store: &mut dyn VMStore,
2611        _caller_instance: RuntimeComponentInstanceIndex,
2612    ) -> Result<()> {
2613        let state = self.concurrent_state_mut(store);
2614        let guest_task = state.guest_task.unwrap();
2615        let task = state.get_mut(guest_task)?;
2616        if !task.cancel_sent {
2617            bail!("`task.cancel` called by task which has not been cancelled")
2618        }
2619        _ = task.lift_result.take().ok_or_else(|| {
2620            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2621        })?;
2622
2623        assert!(task.result.is_none());
2624
2625        log::trace!("task.cancel for {guest_task:?}");
2626
2627        self.task_complete(
2628            store,
2629            guest_task,
2630            Box::new(DummyResult),
2631            Status::ReturnCancelled,
2632            ValRaw::i32(0),
2633        )
2634    }
2635
2636    /// Complete the specified guest task (i.e. indicate that it has either
2637    /// returned a (possibly empty) result or cancelled itself).
2638    ///
2639    /// This will return any resource borrows and notify any current or future
2640    /// waiters that the task has completed.
2641    fn task_complete(
2642        self,
2643        store: &mut dyn VMStore,
2644        guest_task: TableId<GuestTask>,
2645        result: Box<dyn Any + Send + Sync>,
2646        status: Status,
2647        post_return_arg: ValRaw,
2648    ) -> Result<()> {
2649        if self
2650            .concurrent_state_mut(store)
2651            .get(guest_task)?
2652            .call_post_return_automatically()
2653        {
2654            let (calls, host_table, _, instance) = store
2655                .store_opaque_mut()
2656                .component_resource_state_with_instance(self);
2657            ResourceTables {
2658                calls,
2659                host_table: Some(host_table),
2660                guest: Some(instance.guest_tables()),
2661            }
2662            .exit_call()?;
2663        } else {
2664            // As of this writing, the only scenario where `call_post_return_automatically`
2665            // would be false for a `GuestTask` is for host-to-guest calls using
2666            // `[Typed]Func::call_async`, in which case the `function_index`
2667            // should be a non-`None` value.
2668            let function_index = self
2669                .concurrent_state_mut(store)
2670                .get(guest_task)?
2671                .function_index
2672                .unwrap();
2673
2674            self.id()
2675                .get_mut(store)
2676                .post_return_arg_set(function_index, post_return_arg);
2677        }
2678
2679        let state = self.concurrent_state_mut(store);
2680        let task = state.get_mut(guest_task)?;
2681
2682        if let Caller::Host { tx, .. } = &mut task.caller {
2683            if let Some(tx) = tx.take() {
2684                _ = tx.send(result);
2685            }
2686        } else {
2687            task.result = Some(result);
2688            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2689        }
2690
2691        Ok(())
2692    }
2693
2694    /// Implements the `waitable-set.wait` intrinsic.
2695    pub(crate) fn waitable_set_wait(
2696        self,
2697        store: &mut dyn VMStore,
2698        options: OptionsIndex,
2699        set: u32,
2700        payload: u32,
2701    ) -> Result<u32> {
2702        let opts = self.concurrent_state_mut(store).options(options);
2703        let async_ = opts.async_;
2704        let caller_instance = opts.instance;
2705        let rep =
2706            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2707
2708        self.waitable_check(
2709            store,
2710            async_,
2711            WaitableCheck::Wait(WaitableCheckParams {
2712                set: TableId::new(rep),
2713                options,
2714                payload,
2715            }),
2716        )
2717    }
2718
2719    /// Implements the `waitable-set.poll` intrinsic.
2720    pub(crate) fn waitable_set_poll(
2721        self,
2722        store: &mut dyn VMStore,
2723        options: OptionsIndex,
2724        set: u32,
2725        payload: u32,
2726    ) -> Result<u32> {
2727        let opts = self.concurrent_state_mut(store).options(options);
2728        let async_ = opts.async_;
2729        let caller_instance = opts.instance;
2730        let rep =
2731            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2732
2733        self.waitable_check(
2734            store,
2735            async_,
2736            WaitableCheck::Poll(WaitableCheckParams {
2737                set: TableId::new(rep),
2738                options,
2739                payload,
2740            }),
2741        )
2742    }
2743
2744    /// Implements the `yield` intrinsic.
2745    pub(crate) fn yield_(self, store: &mut dyn VMStore, async_: bool) -> Result<bool> {
2746        self.waitable_check(store, async_, WaitableCheck::Yield)
2747            .map(|_code| {
2748                // TODO: plumb cancellation to here:
2749                // https://github.com/bytecodealliance/wasmtime/issues/11191
2750                false
2751            })
2752    }
2753
2754    /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and
2755    /// `yield` intrinsics.
2756    fn waitable_check(
2757        self,
2758        store: &mut dyn VMStore,
2759        async_: bool,
2760        check: WaitableCheck,
2761    ) -> Result<u32> {
2762        if async_ {
2763            bail!(
2764                "todo: async `waitable-set.wait`, `waitable-set.poll`, and `yield` not yet implemented"
2765            );
2766        }
2767
2768        let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
2769
2770        let (wait, set) = match &check {
2771            WaitableCheck::Wait(params) => (true, Some(params.set)),
2772            WaitableCheck::Poll(params) => (false, Some(params.set)),
2773            WaitableCheck::Yield => (false, None),
2774        };
2775
2776        // First, suspend this fiber, allowing any other tasks to run.
2777        self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
2778
2779        log::trace!("waitable check for {guest_task:?}; set {set:?}");
2780
2781        let state = self.concurrent_state_mut(store);
2782        let task = state.get(guest_task)?;
2783
2784        if wait && task.callback.is_some() {
2785            bail!("cannot call `task.wait` from async-lifted export with callback");
2786        }
2787
2788        // If we're waiting, and there are no events immediately available,
2789        // suspend the fiber until that changes.
2790        if wait {
2791            let set = set.unwrap();
2792
2793            if task.event.is_none() && state.get(set)?.ready.is_empty() {
2794                let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
2795                assert!(old.is_none());
2796
2797                self.suspend(
2798                    store,
2799                    SuspendReason::Waiting {
2800                        set,
2801                        task: guest_task,
2802                    },
2803                )?;
2804            }
2805        }
2806
2807        log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
2808
2809        let result = match check {
2810            // Deliver any pending events to the guest and return.
2811            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
2812                let event = self
2813                    .id()
2814                    .get_mut(store)
2815                    .get_event(guest_task, Some(params.set))?;
2816
2817                let (ordinal, handle, result) = if wait {
2818                    let (event, waitable) = event.unwrap();
2819                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2820                    let (ordinal, result) = event.parts();
2821                    (ordinal, handle, result)
2822                } else {
2823                    if let Some((event, waitable)) = event {
2824                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2825                        let (ordinal, result) = event.parts();
2826                        (ordinal, handle, result)
2827                    } else {
2828                        log::trace!(
2829                            "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
2830                            params.set
2831                        );
2832                        let (ordinal, result) = Event::None.parts();
2833                        (ordinal, 0, result)
2834                    }
2835                };
2836                let store = store.store_opaque_mut();
2837                let options = Options::new_index(store, self, params.options);
2838                let ptr = func::validate_inbounds::<(u32, u32)>(
2839                    options.memory_mut(store),
2840                    &ValRaw::u32(params.payload),
2841                )?;
2842                options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
2843                options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
2844                Ok(ordinal)
2845            }
2846            // TODO: Check `GuestTask::event` in case it contains
2847            // `Event::Cancelled`, in which case we'll need to return that to
2848            // the guest:
2849            // https://github.com/bytecodealliance/wasmtime/issues/11191
2850            WaitableCheck::Yield => Ok(0),
2851        };
2852
2853        result
2854    }
2855
2856    /// Implements the `subtask.cancel` intrinsic.
2857    pub(crate) fn subtask_cancel(
2858        self,
2859        store: &mut dyn VMStore,
2860        caller_instance: RuntimeComponentInstanceIndex,
2861        async_: bool,
2862        task_id: u32,
2863    ) -> Result<u32> {
2864        let (rep, is_host) =
2865            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
2866        let (waitable, expected_caller_instance) = if is_host {
2867            let id = TableId::<HostTask>::new(rep);
2868            (
2869                Waitable::Host(id),
2870                self.concurrent_state_mut(store).get(id)?.caller_instance,
2871            )
2872        } else {
2873            let id = TableId::<GuestTask>::new(rep);
2874            if let Caller::Guest { instance, .. } =
2875                &self.concurrent_state_mut(store).get(id)?.caller
2876            {
2877                (Waitable::Guest(id), *instance)
2878            } else {
2879                unreachable!()
2880            }
2881        };
2882        // Since waitables can neither be passed between instances nor forged,
2883        // this should never fail unless there's a bug in Wasmtime, but we check
2884        // here to be sure:
2885        assert_eq!(expected_caller_instance, caller_instance);
2886
2887        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
2888
2889        let concurrent_state = self.concurrent_state_mut(store);
2890        if let Waitable::Host(host_task) = waitable {
2891            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
2892                handle.abort();
2893                return Ok(Status::ReturnCancelled as u32);
2894            }
2895        } else {
2896            let caller = concurrent_state.guest_task.unwrap();
2897            let guest_task = TableId::<GuestTask>::new(rep);
2898            let task = concurrent_state.get_mut(guest_task)?;
2899            if task.lower_params.is_some() {
2900                task.lower_params = None;
2901                task.lift_result = None;
2902
2903                // Not yet started; cancel and remove from pending
2904                let callee_instance = task.instance;
2905
2906                let kind = concurrent_state
2907                    .instance_state(callee_instance)
2908                    .pending
2909                    .remove(&guest_task);
2910
2911                if kind.is_none() {
2912                    bail!("`subtask.cancel` called after terminal status delivered");
2913                }
2914
2915                return Ok(Status::StartCancelled as u32);
2916            } else if task.lift_result.is_some() {
2917                // Started, but not yet returned or cancelled; send the
2918                // `CANCELLED` event
2919                task.cancel_sent = true;
2920                // Note that this might overwrite an event that was set earlier
2921                // (e.g. `Event::None` if the task is yielding, or
2922                // `Event::Cancelled` if it was already cancelled), but that's
2923                // okay -- this should supersede the previous state.
2924                task.event = Some(Event::Cancelled);
2925                if let Some(set) = task.wake_on_cancel.take() {
2926                    let item = match concurrent_state
2927                        .get_mut(set)?
2928                        .waiting
2929                        .remove(&guest_task)
2930                        .unwrap()
2931                    {
2932                        WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
2933                        WaitMode::Callback => WorkItem::GuestCall(GuestCall {
2934                            task: guest_task,
2935                            kind: GuestCallKind::DeliverEvent { set: None },
2936                        }),
2937                    };
2938                    concurrent_state.push_high_priority(item);
2939
2940                    self.suspend(store, SuspendReason::Yielding { task: caller })?;
2941                }
2942
2943                let concurrent_state = self.concurrent_state_mut(store);
2944                let task = concurrent_state.get_mut(guest_task)?;
2945                if task.lift_result.is_some() {
2946                    // Still not yet returned or cancelled; if `async_`, return
2947                    // `BLOCKED`; otherwise wait
2948                    if async_ {
2949                        return Ok(BLOCKED);
2950                    } else {
2951                        let waitable = Waitable::Guest(guest_task);
2952                        let old_set = waitable.common(concurrent_state)?.set;
2953                        let set = concurrent_state.get_mut(caller)?.sync_call_set;
2954                        waitable.join(concurrent_state, Some(set))?;
2955
2956                        self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2957
2958                        waitable.join(self.concurrent_state_mut(store), old_set)?;
2959                    }
2960                }
2961            }
2962        }
2963
2964        let event = waitable.take_event(self.concurrent_state_mut(store))?;
2965        if let Some(Event::Subtask {
2966            status: status @ (Status::Returned | Status::ReturnCancelled),
2967        }) = event
2968        {
2969            Ok(status as u32)
2970        } else {
2971            bail!("`subtask.cancel` called after terminal status delivered");
2972        }
2973    }
2974
2975    /// Configures TLS state so `store` will be available via `tls::get` within
2976    /// the closure `f` provided.
2977    ///
2978    /// This is used to ensure that `Future::poll`, which doesn't take a `store`
2979    /// parameter, is able to get access to the `store` during future poll
2980    /// methods.
2981    fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
2982        struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
2983
2984        impl Drop for Reset<'_> {
2985            fn drop(&mut self) {
2986                self.0.concurrent_async_state_mut().current_instance = self.1;
2987            }
2988        }
2989        let prev = mem::replace(
2990            &mut store.concurrent_async_state_mut().current_instance,
2991            Some(self.id().instance()),
2992        );
2993        let reset = Reset(store, prev);
2994
2995        tls::set(reset.0, f)
2996    }
2997
2998    /// Convenience function to reduce boilerplate.
2999    pub(crate) fn concurrent_state_mut<'a>(
3000        &self,
3001        store: &'a mut StoreOpaque,
3002    ) -> &'a mut ConcurrentState {
3003        self.id().get_mut(store).concurrent_state_mut()
3004    }
3005}
3006
3007/// Trait representing component model ABI async intrinsics and fused adapter
3008/// helper functions.
3009///
3010/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3011/// which must be valid for at least the duration of the call (and possibly for
3012/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3013/// pointers used for async calls).
3014pub trait VMComponentAsyncStore {
3015    /// A helper function for fused adapter modules involving calls where the
3016    /// one of the caller or callee is async.
3017    ///
3018    /// This helper is not used when the caller and callee both use the sync
3019    /// ABI, only when at least one is async is this used.
3020    unsafe fn prepare_call(
3021        &mut self,
3022        instance: Instance,
3023        memory: *mut VMMemoryDefinition,
3024        start: *mut VMFuncRef,
3025        return_: *mut VMFuncRef,
3026        caller_instance: RuntimeComponentInstanceIndex,
3027        callee_instance: RuntimeComponentInstanceIndex,
3028        task_return_type: TypeTupleIndex,
3029        string_encoding: u8,
3030        result_count: u32,
3031        storage: *mut ValRaw,
3032        storage_len: usize,
3033    ) -> Result<()>;
3034
3035    /// A helper function for fused adapter modules involving calls where the
3036    /// caller is sync-lowered but the callee is async-lifted.
3037    unsafe fn sync_start(
3038        &mut self,
3039        instance: Instance,
3040        callback: *mut VMFuncRef,
3041        callee: *mut VMFuncRef,
3042        param_count: u32,
3043        storage: *mut MaybeUninit<ValRaw>,
3044        storage_len: usize,
3045    ) -> Result<()>;
3046
3047    /// A helper function for fused adapter modules involving calls where the
3048    /// caller is async-lowered.
3049    unsafe fn async_start(
3050        &mut self,
3051        instance: Instance,
3052        callback: *mut VMFuncRef,
3053        post_return: *mut VMFuncRef,
3054        callee: *mut VMFuncRef,
3055        param_count: u32,
3056        result_count: u32,
3057        flags: u32,
3058    ) -> Result<u32>;
3059
3060    /// The `future.write` intrinsic.
3061    fn future_write(
3062        &mut self,
3063        instance: Instance,
3064        ty: TypeFutureTableIndex,
3065        options: OptionsIndex,
3066        future: u32,
3067        address: u32,
3068    ) -> Result<u32>;
3069
3070    /// The `future.read` intrinsic.
3071    fn future_read(
3072        &mut self,
3073        instance: Instance,
3074        ty: TypeFutureTableIndex,
3075        options: OptionsIndex,
3076        future: u32,
3077        address: u32,
3078    ) -> Result<u32>;
3079
3080    /// The `future.drop-writable` intrinsic.
3081    fn future_drop_writable(
3082        &mut self,
3083        instance: Instance,
3084        ty: TypeFutureTableIndex,
3085        writer: u32,
3086    ) -> Result<()>;
3087
3088    /// The `stream.write` intrinsic.
3089    fn stream_write(
3090        &mut self,
3091        instance: Instance,
3092        ty: TypeStreamTableIndex,
3093        options: OptionsIndex,
3094        stream: u32,
3095        address: u32,
3096        count: u32,
3097    ) -> Result<u32>;
3098
3099    /// The `stream.read` intrinsic.
3100    fn stream_read(
3101        &mut self,
3102        instance: Instance,
3103        ty: TypeStreamTableIndex,
3104        options: OptionsIndex,
3105        stream: u32,
3106        address: u32,
3107        count: u32,
3108    ) -> Result<u32>;
3109
3110    /// The "fast-path" implementation of the `stream.write` intrinsic for
3111    /// "flat" (i.e. memcpy-able) payloads.
3112    fn flat_stream_write(
3113        &mut self,
3114        instance: Instance,
3115        ty: TypeStreamTableIndex,
3116        options: OptionsIndex,
3117        payload_size: u32,
3118        payload_align: u32,
3119        stream: u32,
3120        address: u32,
3121        count: u32,
3122    ) -> Result<u32>;
3123
3124    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3125    /// (i.e. memcpy-able) payloads.
3126    fn flat_stream_read(
3127        &mut self,
3128        instance: Instance,
3129        ty: TypeStreamTableIndex,
3130        options: OptionsIndex,
3131        payload_size: u32,
3132        payload_align: u32,
3133        stream: u32,
3134        address: u32,
3135        count: u32,
3136    ) -> Result<u32>;
3137
3138    /// The `stream.drop-writable` intrinsic.
3139    fn stream_drop_writable(
3140        &mut self,
3141        instance: Instance,
3142        ty: TypeStreamTableIndex,
3143        writer: u32,
3144    ) -> Result<()>;
3145
3146    /// The `error-context.debug-message` intrinsic.
3147    fn error_context_debug_message(
3148        &mut self,
3149        instance: Instance,
3150        ty: TypeComponentLocalErrorContextTableIndex,
3151        options: OptionsIndex,
3152        err_ctx_handle: u32,
3153        debug_msg_address: u32,
3154    ) -> Result<()>;
3155}
3156
3157/// SAFETY: See trait docs.
3158impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3159    unsafe fn prepare_call(
3160        &mut self,
3161        instance: Instance,
3162        memory: *mut VMMemoryDefinition,
3163        start: *mut VMFuncRef,
3164        return_: *mut VMFuncRef,
3165        caller_instance: RuntimeComponentInstanceIndex,
3166        callee_instance: RuntimeComponentInstanceIndex,
3167        task_return_type: TypeTupleIndex,
3168        string_encoding: u8,
3169        result_count_or_max_if_async: u32,
3170        storage: *mut ValRaw,
3171        storage_len: usize,
3172    ) -> Result<()> {
3173        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3174        // this method will have ensured that `storage` is a valid
3175        // pointer containing at least `storage_len` items.
3176        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3177
3178        unsafe {
3179            instance.prepare_call(
3180                StoreContextMut(self),
3181                start,
3182                return_,
3183                caller_instance,
3184                callee_instance,
3185                task_return_type,
3186                memory,
3187                string_encoding,
3188                match result_count_or_max_if_async {
3189                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3190                        params,
3191                        has_result: false,
3192                    },
3193                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3194                        params,
3195                        has_result: true,
3196                    },
3197                    result_count => CallerInfo::Sync {
3198                        params,
3199                        result_count,
3200                    },
3201                },
3202            )
3203        }
3204    }
3205
3206    unsafe fn sync_start(
3207        &mut self,
3208        instance: Instance,
3209        callback: *mut VMFuncRef,
3210        callee: *mut VMFuncRef,
3211        param_count: u32,
3212        storage: *mut MaybeUninit<ValRaw>,
3213        storage_len: usize,
3214    ) -> Result<()> {
3215        unsafe {
3216            instance
3217                .start_call(
3218                    StoreContextMut(self),
3219                    callback,
3220                    ptr::null_mut(),
3221                    callee,
3222                    param_count,
3223                    1,
3224                    START_FLAG_ASYNC_CALLEE,
3225                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3226                    // this method will have ensured that `storage` is a valid
3227                    // pointer containing at least `storage_len` items.
3228                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3229                )
3230                .map(drop)
3231        }
3232    }
3233
3234    unsafe fn async_start(
3235        &mut self,
3236        instance: Instance,
3237        callback: *mut VMFuncRef,
3238        post_return: *mut VMFuncRef,
3239        callee: *mut VMFuncRef,
3240        param_count: u32,
3241        result_count: u32,
3242        flags: u32,
3243    ) -> Result<u32> {
3244        unsafe {
3245            instance.start_call(
3246                StoreContextMut(self),
3247                callback,
3248                post_return,
3249                callee,
3250                param_count,
3251                result_count,
3252                flags,
3253                None,
3254            )
3255        }
3256    }
3257
3258    fn future_write(
3259        &mut self,
3260        instance: Instance,
3261        ty: TypeFutureTableIndex,
3262        options: OptionsIndex,
3263        future: u32,
3264        address: u32,
3265    ) -> Result<u32> {
3266        instance
3267            .guest_write(
3268                StoreContextMut(self),
3269                TransmitIndex::Future(ty),
3270                options,
3271                None,
3272                future,
3273                address,
3274                1,
3275            )
3276            .map(|result| result.encode())
3277    }
3278
3279    fn future_read(
3280        &mut self,
3281        instance: Instance,
3282        ty: TypeFutureTableIndex,
3283        options: OptionsIndex,
3284        future: u32,
3285        address: u32,
3286    ) -> Result<u32> {
3287        instance
3288            .guest_read(
3289                StoreContextMut(self),
3290                TransmitIndex::Future(ty),
3291                options,
3292                None,
3293                future,
3294                address,
3295                1,
3296            )
3297            .map(|result| result.encode())
3298    }
3299
3300    fn stream_write(
3301        &mut self,
3302        instance: Instance,
3303        ty: TypeStreamTableIndex,
3304        options: OptionsIndex,
3305        stream: u32,
3306        address: u32,
3307        count: u32,
3308    ) -> Result<u32> {
3309        instance
3310            .guest_write(
3311                StoreContextMut(self),
3312                TransmitIndex::Stream(ty),
3313                options,
3314                None,
3315                stream,
3316                address,
3317                count,
3318            )
3319            .map(|result| result.encode())
3320    }
3321
3322    fn stream_read(
3323        &mut self,
3324        instance: Instance,
3325        ty: TypeStreamTableIndex,
3326        options: OptionsIndex,
3327        stream: u32,
3328        address: u32,
3329        count: u32,
3330    ) -> Result<u32> {
3331        instance
3332            .guest_read(
3333                StoreContextMut(self),
3334                TransmitIndex::Stream(ty),
3335                options,
3336                None,
3337                stream,
3338                address,
3339                count,
3340            )
3341            .map(|result| result.encode())
3342    }
3343
3344    fn future_drop_writable(
3345        &mut self,
3346        instance: Instance,
3347        ty: TypeFutureTableIndex,
3348        writer: u32,
3349    ) -> Result<()> {
3350        instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Future(ty), writer)
3351    }
3352
3353    fn flat_stream_write(
3354        &mut self,
3355        instance: Instance,
3356        ty: TypeStreamTableIndex,
3357        options: OptionsIndex,
3358        payload_size: u32,
3359        payload_align: u32,
3360        stream: u32,
3361        address: u32,
3362        count: u32,
3363    ) -> Result<u32> {
3364        instance
3365            .guest_write(
3366                StoreContextMut(self),
3367                TransmitIndex::Stream(ty),
3368                options,
3369                Some(FlatAbi {
3370                    size: payload_size,
3371                    align: payload_align,
3372                }),
3373                stream,
3374                address,
3375                count,
3376            )
3377            .map(|result| result.encode())
3378    }
3379
3380    fn flat_stream_read(
3381        &mut self,
3382        instance: Instance,
3383        ty: TypeStreamTableIndex,
3384        options: OptionsIndex,
3385        payload_size: u32,
3386        payload_align: u32,
3387        stream: u32,
3388        address: u32,
3389        count: u32,
3390    ) -> Result<u32> {
3391        instance
3392            .guest_read(
3393                StoreContextMut(self),
3394                TransmitIndex::Stream(ty),
3395                options,
3396                Some(FlatAbi {
3397                    size: payload_size,
3398                    align: payload_align,
3399                }),
3400                stream,
3401                address,
3402                count,
3403            )
3404            .map(|result| result.encode())
3405    }
3406
3407    fn stream_drop_writable(
3408        &mut self,
3409        instance: Instance,
3410        ty: TypeStreamTableIndex,
3411        writer: u32,
3412    ) -> Result<()> {
3413        instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Stream(ty), writer)
3414    }
3415
3416    fn error_context_debug_message(
3417        &mut self,
3418        instance: Instance,
3419        ty: TypeComponentLocalErrorContextTableIndex,
3420        options: OptionsIndex,
3421        err_ctx_handle: u32,
3422        debug_msg_address: u32,
3423    ) -> Result<()> {
3424        instance.error_context_debug_message(
3425            StoreContextMut(self),
3426            ty,
3427            options,
3428            err_ctx_handle,
3429            debug_msg_address,
3430        )
3431    }
3432}
3433
3434/// Represents the output of a host task or background task.
3435pub(crate) enum HostTaskOutput {
3436    /// A plain result
3437    Result(Result<()>),
3438    /// A function to be run after the future completes (e.g. post-processing
3439    /// which requires access to the store and instance).
3440    Function(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>),
3441}
3442
3443impl HostTaskOutput {
3444    /// Retrieve the result of the host or background task, running the
3445    /// post-processing function if present.
3446    fn consume(self, store: &mut dyn VMStore, instance: Instance) -> Result<()> {
3447        match self {
3448            Self::Function(fun) => fun(store, instance),
3449            Self::Result(result) => result,
3450        }
3451    }
3452}
3453
3454type HostTaskFuture = Pin<Box<dyn Future<Output = HostTaskOutput> + Send + 'static>>;
3455
3456/// Represents the state of a pending host task.
3457struct HostTask {
3458    common: WaitableCommon,
3459    caller_instance: RuntimeComponentInstanceIndex,
3460    join_handle: Option<JoinHandle>,
3461}
3462
3463impl HostTask {
3464    fn new(
3465        caller_instance: RuntimeComponentInstanceIndex,
3466        join_handle: Option<JoinHandle>,
3467    ) -> Self {
3468        Self {
3469            common: WaitableCommon::default(),
3470            caller_instance,
3471            join_handle,
3472        }
3473    }
3474}
3475
3476impl TableDebug for HostTask {
3477    fn type_name() -> &'static str {
3478        "HostTask"
3479    }
3480}
3481
3482type CallbackFn = Box<
3483    dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3484        + Send
3485        + Sync
3486        + 'static,
3487>;
3488
3489/// Represents the caller of a given guest task.
3490enum Caller {
3491    /// The host called the guest task.
3492    Host {
3493        /// If present, may be used to deliver the result.
3494        tx: Option<oneshot::Sender<LiftedResult>>,
3495        /// If true, remove the task from the concurrent state that owns it
3496        /// automatically after it completes.
3497        remove_task_automatically: bool,
3498        /// If true, call `post-return` function (if any) automatically.
3499        call_post_return_automatically: bool,
3500    },
3501    /// Another guest task called the guest task
3502    Guest {
3503        /// The id of the caller
3504        task: TableId<GuestTask>,
3505        /// The instance to use to enforce reentrance rules.
3506        ///
3507        /// Note that this might not be the same as the instance the caller task
3508        /// started executing in given that one or more synchronous guest->guest
3509        /// calls may have occurred involving multiple instances.
3510        instance: RuntimeComponentInstanceIndex,
3511    },
3512}
3513
3514/// Represents a closure and related canonical ABI parameters required to
3515/// validate a `task.return` call at runtime and lift the result.
3516struct LiftResult {
3517    lift: RawLift,
3518    ty: TypeTupleIndex,
3519    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3520    string_encoding: StringEncoding,
3521}
3522
3523/// Represents a pending guest task.
3524struct GuestTask {
3525    /// See `WaitableCommon`
3526    common: WaitableCommon,
3527    /// Closure to lower the parameters passed to this task.
3528    lower_params: Option<RawLower>,
3529    /// See `LiftResult`
3530    lift_result: Option<LiftResult>,
3531    /// A place to stash the type-erased lifted result if it can't be delivered
3532    /// immediately.
3533    result: Option<LiftedResult>,
3534    /// Closure to call the callback function for an async-lifted export, if
3535    /// provided.
3536    callback: Option<CallbackFn>,
3537    /// See `Caller`
3538    caller: Caller,
3539    /// A place to stash the call context for managing resource borrows while
3540    /// switching between guest tasks.
3541    call_context: Option<CallContext>,
3542    /// A place to stash the lowered result for a sync-to-async call until it
3543    /// can be returned to the caller.
3544    sync_result: Option<Option<ValRaw>>,
3545    /// Whether or not the task has been cancelled (i.e. whether the task is
3546    /// permitted to call `task.cancel`).
3547    cancel_sent: bool,
3548    /// Whether or not we've sent a `Status::Starting` event to any current or
3549    /// future waiters for this waitable.
3550    starting_sent: bool,
3551    /// Context-local state used to implement the `context.{get,set}`
3552    /// intrinsics.
3553    context: [u32; 2],
3554    /// Pending guest subtasks created by this task (directly or indirectly).
3555    ///
3556    /// This is used to re-parent subtasks which are still running when their
3557    /// parent task is disposed.
3558    subtasks: HashSet<TableId<GuestTask>>,
3559    /// Scratch waitable set used to watch subtasks during synchronous calls.
3560    sync_call_set: TableId<WaitableSet>,
3561    /// The instance to which the exported function for this guest task belongs.
3562    ///
3563    /// Note that the task may do a sync->sync call via a fused adapter which
3564    /// results in that task executing code in a different instance, and it may
3565    /// call host functions and intrinsics from that other instance.
3566    instance: RuntimeComponentInstanceIndex,
3567    /// If present, a pending `Event::None` or `Event::Cancelled` to be
3568    /// delivered to this task.
3569    event: Option<Event>,
3570    /// If present, indicates that the task is currently waiting on the
3571    /// specified set but may be cancelled and woken immediately.
3572    wake_on_cancel: Option<TableId<WaitableSet>>,
3573    /// The `ExportIndex` of the guest function being called, if known.
3574    function_index: Option<ExportIndex>,
3575    /// Whether or not the task has exited.
3576    exited: bool,
3577}
3578
3579impl GuestTask {
3580    fn new(
3581        state: &mut ConcurrentState,
3582        lower_params: RawLower,
3583        lift_result: LiftResult,
3584        caller: Caller,
3585        callback: Option<CallbackFn>,
3586        component_instance: RuntimeComponentInstanceIndex,
3587    ) -> Result<Self> {
3588        let sync_call_set = state.push(WaitableSet::default())?;
3589
3590        Ok(Self {
3591            common: WaitableCommon::default(),
3592            lower_params: Some(lower_params),
3593            lift_result: Some(lift_result),
3594            result: None,
3595            callback,
3596            caller,
3597            call_context: Some(CallContext::default()),
3598            sync_result: None,
3599            cancel_sent: false,
3600            starting_sent: false,
3601            context: [0u32; 2],
3602            subtasks: HashSet::new(),
3603            sync_call_set,
3604            instance: component_instance,
3605            event: None,
3606            wake_on_cancel: None,
3607            function_index: None,
3608            exited: false,
3609        })
3610    }
3611
3612    /// Dispose of this guest task, reparenting any pending subtasks to the
3613    /// caller.
3614    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3615        // If there are not-yet-delivered completion events for subtasks in
3616        // `self.sync_call_set`, recursively dispose of those subtasks as well.
3617        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3618            if let Some(Event::Subtask {
3619                status: Status::Returned | Status::ReturnCancelled,
3620            }) = waitable.common(state)?.event
3621            {
3622                waitable.delete_from(state)?;
3623            }
3624        }
3625
3626        state.delete(self.sync_call_set)?;
3627
3628        // Reparent any pending subtasks to the caller.
3629        if let Caller::Guest {
3630            task,
3631            instance: runtime_instance,
3632        } = &self.caller
3633        {
3634            let task_mut = state.get_mut(*task)?;
3635            let present = task_mut.subtasks.remove(&me);
3636            assert!(present);
3637
3638            for subtask in &self.subtasks {
3639                task_mut.subtasks.insert(*subtask);
3640            }
3641
3642            for subtask in &self.subtasks {
3643                state.get_mut(*subtask)?.caller = Caller::Guest {
3644                    task: *task,
3645                    instance: *runtime_instance,
3646                };
3647            }
3648        } else {
3649            for subtask in &self.subtasks {
3650                state.get_mut(*subtask)?.caller = Caller::Host {
3651                    tx: None,
3652                    remove_task_automatically: true,
3653                    call_post_return_automatically: true,
3654                };
3655            }
3656        }
3657
3658        Ok(())
3659    }
3660
3661    fn call_post_return_automatically(&self) -> bool {
3662        matches!(
3663            self.caller,
3664            Caller::Guest { .. }
3665                | Caller::Host {
3666                    call_post_return_automatically: true,
3667                    ..
3668                }
3669        )
3670    }
3671}
3672
3673impl TableDebug for GuestTask {
3674    fn type_name() -> &'static str {
3675        "GuestTask"
3676    }
3677}
3678
3679/// Represents state common to all kinds of waitables.
3680#[derive(Default)]
3681struct WaitableCommon {
3682    /// The currently pending event for this waitable, if any.
3683    event: Option<Event>,
3684    /// The set to which this waitable belongs, if any.
3685    set: Option<TableId<WaitableSet>>,
3686    /// The handle with which the guest refers to this waitable, if any.
3687    handle: Option<u32>,
3688}
3689
3690/// Represents a Component Model Async `waitable`.
3691#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3692enum Waitable {
3693    /// A host task
3694    Host(TableId<HostTask>),
3695    /// A guest task
3696    Guest(TableId<GuestTask>),
3697    /// The read or write end of a stream or future
3698    Transmit(TableId<TransmitHandle>),
3699}
3700
3701impl Waitable {
3702    /// Retrieve the `Waitable` corresponding to the specified guest-visible
3703    /// handle.
3704    fn from_instance(
3705        state: Pin<&mut ComponentInstance>,
3706        caller_instance: RuntimeComponentInstanceIndex,
3707        waitable: u32,
3708    ) -> Result<Self> {
3709        use crate::runtime::vm::component::Waitable;
3710
3711        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
3712
3713        Ok(match kind {
3714            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
3715            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
3716            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
3717        })
3718    }
3719
3720    /// Retrieve the host-visible identifier for this `Waitable`.
3721    fn rep(&self) -> u32 {
3722        match self {
3723            Self::Host(id) => id.rep(),
3724            Self::Guest(id) => id.rep(),
3725            Self::Transmit(id) => id.rep(),
3726        }
3727    }
3728
3729    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
3730    /// remove it from any set it may currently belong to (when `set` is
3731    /// `None`).
3732    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3733        log::trace!("waitable {self:?} join set {set:?}",);
3734
3735        let old = mem::replace(&mut self.common(state)?.set, set);
3736
3737        if let Some(old) = old {
3738            match *self {
3739                Waitable::Host(id) => state.remove_child(id, old),
3740                Waitable::Guest(id) => state.remove_child(id, old),
3741                Waitable::Transmit(id) => state.remove_child(id, old),
3742            }?;
3743
3744            state.get_mut(old)?.ready.remove(self);
3745        }
3746
3747        if let Some(set) = set {
3748            match *self {
3749                Waitable::Host(id) => state.add_child(id, set),
3750                Waitable::Guest(id) => state.add_child(id, set),
3751                Waitable::Transmit(id) => state.add_child(id, set),
3752            }?;
3753
3754            if self.common(state)?.event.is_some() {
3755                self.mark_ready(state)?;
3756            }
3757        }
3758
3759        Ok(())
3760    }
3761
3762    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
3763    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3764        Ok(match self {
3765            Self::Host(id) => &mut state.get_mut(*id)?.common,
3766            Self::Guest(id) => &mut state.get_mut(*id)?.common,
3767            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3768        })
3769    }
3770
3771    /// Set or clear the pending event for this waitable and either deliver it
3772    /// to the first waiter, if any, or mark it as ready to be delivered to the
3773    /// next waiter that arrives.
3774    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3775        log::trace!("set event for {self:?}: {event:?}");
3776        self.common(state)?.event = event;
3777        self.mark_ready(state)
3778    }
3779
3780    /// Take the pending event from this waitable, leaving `None` in its place.
3781    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
3782        let common = self.common(state)?;
3783        let event = common.event.take();
3784        if let Some(set) = self.common(state)?.set {
3785            state.get_mut(set)?.ready.remove(self);
3786        }
3787        Ok(event)
3788    }
3789
3790    /// Deliver the current event for this waitable to the first waiter, if any,
3791    /// or else mark it as ready to be delivered to the next waiter that
3792    /// arrives.
3793    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
3794        if let Some(set) = self.common(state)?.set {
3795            state.get_mut(set)?.ready.insert(*self);
3796            if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
3797                let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
3798                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
3799
3800                let item = match mode {
3801                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3802                    WaitMode::Callback => WorkItem::GuestCall(GuestCall {
3803                        task,
3804                        kind: GuestCallKind::DeliverEvent { set: Some(set) },
3805                    }),
3806                };
3807                state.push_high_priority(item);
3808            }
3809        }
3810        Ok(())
3811    }
3812
3813    /// Handle the imminent delivery of the specified event, e.g. by updating
3814    /// the state of the stream or future.
3815    fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
3816        match event {
3817            Event::FutureRead {
3818                pending: Some((ty, handle)),
3819                ..
3820            }
3821            | Event::FutureWrite {
3822                pending: Some((ty, handle)),
3823                ..
3824            } => {
3825                let runtime_instance = instance.component().types()[ty].instance;
3826                let (rep, state) = instance.guest_tables().0[runtime_instance]
3827                    .future_rep(ty, handle)
3828                    .unwrap();
3829                assert_eq!(rep, self.rep());
3830                assert_eq!(*state, TransmitLocalState::Busy);
3831                *state = match event {
3832                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
3833                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
3834                    _ => unreachable!(),
3835                };
3836            }
3837            Event::StreamRead {
3838                pending: Some((ty, handle)),
3839                code,
3840            }
3841            | Event::StreamWrite {
3842                pending: Some((ty, handle)),
3843                code,
3844            } => {
3845                let runtime_instance = instance.component().types()[ty].instance;
3846                let (rep, state) = instance.guest_tables().0[runtime_instance]
3847                    .stream_rep(ty, handle)
3848                    .unwrap();
3849                assert_eq!(rep, self.rep());
3850                assert_eq!(*state, TransmitLocalState::Busy);
3851                let done = matches!(code, ReturnCode::Dropped(_));
3852                *state = match event {
3853                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
3854                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
3855                    _ => unreachable!(),
3856                };
3857            }
3858            _ => {}
3859        }
3860    }
3861
3862    /// Remove this waitable from the instance's rep table.
3863    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
3864        match self {
3865            Self::Host(task) => {
3866                log::trace!("delete host task {task:?}");
3867                state.delete(*task)?;
3868            }
3869            Self::Guest(task) => {
3870                log::trace!("delete guest task {task:?}");
3871                state.delete(*task)?.dispose(state, *task)?;
3872            }
3873            Self::Transmit(task) => {
3874                state.delete(*task)?;
3875            }
3876        }
3877
3878        Ok(())
3879    }
3880}
3881
3882impl fmt::Debug for Waitable {
3883    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3884        match self {
3885            Self::Host(id) => write!(f, "{id:?}"),
3886            Self::Guest(id) => write!(f, "{id:?}"),
3887            Self::Transmit(id) => write!(f, "{id:?}"),
3888        }
3889    }
3890}
3891
3892/// Represents a Component Model Async `waitable-set`.
3893#[derive(Default)]
3894struct WaitableSet {
3895    /// Which waitables in this set have pending events, if any.
3896    ready: BTreeSet<Waitable>,
3897    /// Which guest tasks are currently waiting on this set, if any.
3898    waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
3899}
3900
3901impl TableDebug for WaitableSet {
3902    fn type_name() -> &'static str {
3903        "WaitableSet"
3904    }
3905}
3906
3907/// Type-erased closure to lower the parameters for a guest task.
3908type RawLower = Box<
3909    dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
3910>;
3911
3912/// Type-erased closure to lift the result for a guest task.
3913type RawLift = Box<
3914    dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
3915        + Send
3916        + Sync,
3917>;
3918
3919/// Type erased result of a guest task which may be downcast to the expected
3920/// type by a host caller (or simply ignored in the case of a guest caller; see
3921/// `DummyResult`).
3922type LiftedResult = Box<dyn Any + Send + Sync>;
3923
3924/// Used to return a result from a `LiftFn` when the actual result has already
3925/// been lowered to a guest task's stack and linear memory.
3926struct DummyResult;
3927
3928/// Represents the state of a currently executing fiber which has been resumed
3929/// via `self::poll_fn`.
3930pub(crate) struct AsyncState {
3931    /// The current instance being polled, if any, which is used to perform
3932    /// checks to ensure that futures are always polled within the correct
3933    /// instance.
3934    current_instance: Option<ComponentInstanceId>,
3935}
3936
3937impl Default for AsyncState {
3938    fn default() -> Self {
3939        Self {
3940            current_instance: None,
3941        }
3942    }
3943}
3944
3945/// Represents the Component Model Async state of a (sub-)component instance.
3946#[derive(Default)]
3947struct InstanceState {
3948    /// Whether backpressure is set for this instance
3949    backpressure: bool,
3950    /// Whether this instance can be entered
3951    do_not_enter: bool,
3952    /// Pending calls for this instance which require `Self::backpressure` to be
3953    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
3954    pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
3955}
3956
3957/// Represents the Component Model Async state of a top-level component instance
3958/// (i.e. a `super::ComponentInstance`).
3959pub struct ConcurrentState {
3960    /// The currently running guest task, if any.
3961    guest_task: Option<TableId<GuestTask>>,
3962    /// The set of pending host and background tasks, if any.
3963    ///
3964    /// We must wrap this in a `Mutex` to ensure that `ComponentInstance` and
3965    /// `Store` satisfy a `Sync` bound, but it can't actually be accessed from
3966    /// more than one thread at a time.
3967    ///
3968    /// See `ComponentInstance::poll_until` for where we temporarily take this
3969    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
3970    futures: Mutex<Option<FuturesUnordered<HostTaskFuture>>>,
3971    /// The table of waitables, waitable sets, etc.
3972    table: Table,
3973    /// Per (sub-)component instance states.
3974    ///
3975    /// See `InstanceState` for details and note that this map is lazily
3976    /// populated as needed.
3977    // TODO: this can and should be a `PrimaryMap`
3978    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
3979    /// The "high priority" work queue for this instance's event loop.
3980    high_priority: Vec<WorkItem>,
3981    /// The "high priority" work queue for this instance's event loop.
3982    low_priority: Vec<WorkItem>,
3983    /// A place to stash the reason a fiber is suspending so that the code which
3984    /// resumed it will know under what conditions the fiber should be resumed
3985    /// again.
3986    suspend_reason: Option<SuspendReason>,
3987    /// A cached fiber which is waiting for work to do.
3988    ///
3989    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
3990    worker: Option<StoreFiber<'static>>,
3991    /// A place to stash the work item for which we're resuming a worker fiber.
3992    worker_item: Option<WorkerItem>,
3993
3994    /// Reference counts for all component error contexts
3995    ///
3996    /// NOTE: it is possible the global ref count to be *greater* than the sum of
3997    /// (sub)component ref counts as tracked by `error_context_tables`, for
3998    /// example when the host holds one or more references to error contexts.
3999    ///
4000    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4001    /// component-wide representation) of the index into concurrent state for a given
4002    /// stored `ErrorContext`.
4003    ///
4004    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4005    /// as a `TableId<ErrorContextState>`.
4006    global_error_context_ref_counts:
4007        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4008
4009    /// Mirror of type information in `ComponentInstance`, placed here for
4010    /// convenience at the cost of an extra `Arc` clone.
4011    component: Component,
4012}
4013
4014impl ConcurrentState {
4015    pub(crate) fn new(component: &Component) -> Self {
4016        Self {
4017            guest_task: None,
4018            table: Table::new(),
4019            futures: Mutex::new(Some(FuturesUnordered::new())),
4020            instance_states: HashMap::new(),
4021            high_priority: Vec::new(),
4022            low_priority: Vec::new(),
4023            suspend_reason: None,
4024            worker: None,
4025            worker_item: None,
4026            global_error_context_ref_counts: BTreeMap::new(),
4027            component: component.clone(),
4028        }
4029    }
4030
4031    /// Take ownership of any fibers and futures owned by this object.
4032    ///
4033    /// This should be used when disposing of the `Store` containing this object
4034    /// in order to gracefully resolve any and all fibers using
4035    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4036    /// use-after-free bugs due to fibers which may still have access to the
4037    /// `Store`.
4038    ///
4039    /// Additionally, the futures collected with this function should be dropped
4040    /// within a `tls::set` call, which will ensure than any futures closing
4041    /// over an `&Accessor` will have access to the store when dropped, allowing
4042    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4043    /// panicking.
4044    ///
4045    /// Note that this will leave the object in an inconsistent and unusable
4046    /// state, so it should only be used just prior to dropping it.
4047    pub(crate) fn take_fibers_and_futures(
4048        &mut self,
4049        fibers: &mut Vec<StoreFiber<'static>>,
4050        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4051    ) {
4052        for entry in self.table.iter_mut() {
4053            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4054                for mode in mem::take(&mut set.waiting).into_values() {
4055                    if let WaitMode::Fiber(fiber) = mode {
4056                        fibers.push(fiber);
4057                    }
4058                }
4059            }
4060        }
4061
4062        if let Some(fiber) = self.worker.take() {
4063            fibers.push(fiber);
4064        }
4065
4066        let mut take_items = |list| {
4067            for item in mem::take(list) {
4068                match item {
4069                    WorkItem::ResumeFiber(fiber) => {
4070                        fibers.push(fiber);
4071                    }
4072                    WorkItem::PushFuture(future) => {
4073                        self.futures
4074                            .get_mut()
4075                            .unwrap()
4076                            .as_mut()
4077                            .unwrap()
4078                            .push(future.into_inner().unwrap());
4079                    }
4080                    _ => {}
4081                }
4082            }
4083        };
4084
4085        take_items(&mut self.high_priority);
4086        take_items(&mut self.low_priority);
4087
4088        if let Some(them) = self.futures.get_mut().unwrap().take() {
4089            futures.push(them);
4090        }
4091    }
4092
4093    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4094        self.instance_states.entry(instance).or_default()
4095    }
4096
4097    fn push<V: Send + Sync + 'static>(&mut self, value: V) -> Result<TableId<V>, TableError> {
4098        self.table.push(value)
4099    }
4100
4101    fn get<V: 'static>(&self, id: TableId<V>) -> Result<&V, TableError> {
4102        self.table.get(id)
4103    }
4104
4105    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, TableError> {
4106        self.table.get_mut(id)
4107    }
4108
4109    pub fn add_child<T, U>(
4110        &mut self,
4111        child: TableId<T>,
4112        parent: TableId<U>,
4113    ) -> Result<(), TableError> {
4114        self.table.add_child(child, parent)
4115    }
4116
4117    pub fn remove_child<T, U>(
4118        &mut self,
4119        child: TableId<T>,
4120        parent: TableId<U>,
4121    ) -> Result<(), TableError> {
4122        self.table.remove_child(child, parent)
4123    }
4124
4125    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, TableError> {
4126        self.table.delete(id)
4127    }
4128
4129    fn push_future(&mut self, future: HostTaskFuture) {
4130        // Note that we can't directly push to `ConcurrentState::futures` here
4131        // since this may be called from a future that's being polled inside
4132        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4133        // so it has exclusive access while polling it.  Therefore, we push a
4134        // work item to the "high priority" queue, which will actually push to
4135        // `ConcurrentState::futures` later.
4136        self.push_high_priority(WorkItem::PushFuture(Mutex::new(future)));
4137    }
4138
4139    fn push_high_priority(&mut self, item: WorkItem) {
4140        log::trace!("push high priority: {item:?}");
4141        self.high_priority.push(item);
4142    }
4143
4144    fn push_low_priority(&mut self, item: WorkItem) {
4145        log::trace!("push low priority: {item:?}");
4146        self.low_priority.push(item);
4147    }
4148
4149    /// Determine whether the instance associated with the specified guest task
4150    /// may be entered (i.e. is not already on the async call stack).
4151    ///
4152    /// This is an additional check on top of the "may_enter" instance flag;
4153    /// it's needed because async-lifted exports with callback functions must
4154    /// not call their own instances directly or indirectly, and due to the
4155    /// "stackless" nature of callback-enabled guest tasks this may happen even
4156    /// if there are no activation records on the stack (i.e. the "may_enter"
4157    /// field is `true`) for that instance.
4158    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4159        let guest_instance = self.get(guest_task).unwrap().instance;
4160
4161        // Walk the task tree back to the root, looking for potential
4162        // reentrance.
4163        //
4164        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4165        // such that each bit represents and instance which has been entered by
4166        // that task or an ancestor of that task, in which case this would be a
4167        // constant time check.
4168        loop {
4169            match &self.get_mut(guest_task).unwrap().caller {
4170                Caller::Host { .. } => break true,
4171                Caller::Guest { task, instance } => {
4172                    if *instance == guest_instance {
4173                        break false;
4174                    } else {
4175                        guest_task = *task;
4176                    }
4177                }
4178            }
4179        }
4180    }
4181
4182    /// Record that we're about to enter a (sub-)component instance which does
4183    /// not support more than one concurrent, stackful activation, meaning it
4184    /// cannot be entered again until the next call returns.
4185    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4186        self.instance_state(instance).do_not_enter = true;
4187    }
4188
4189    /// Record that we've exited a (sub-)component instance previously entered
4190    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4191    /// See the documentation for the latter for details.
4192    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4193        self.instance_state(instance).do_not_enter = false;
4194        self.partition_pending(instance)
4195    }
4196
4197    /// Iterate over `InstanceState::pending`, moving any ready items into the
4198    /// "high priority" work item queue.
4199    ///
4200    /// See `GuestCall::is_ready` for details.
4201    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4202        for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4203            let call = GuestCall { task, kind };
4204            if call.is_ready(self)? {
4205                self.push_high_priority(WorkItem::GuestCall(call));
4206            } else {
4207                self.instance_state(instance)
4208                    .pending
4209                    .insert(call.task, call.kind);
4210            }
4211        }
4212
4213        Ok(())
4214    }
4215
4216    /// Implements the `backpressure.set` intrinsic.
4217    pub(crate) fn backpressure_set(
4218        &mut self,
4219        caller_instance: RuntimeComponentInstanceIndex,
4220        enabled: u32,
4221    ) -> Result<()> {
4222        let state = self.instance_state(caller_instance);
4223        let old = state.backpressure;
4224        let new = enabled != 0;
4225        state.backpressure = new;
4226
4227        if old && !new {
4228            // Backpressure was previously enabled and is now disabled; move any
4229            // newly-eligible guest calls to the "high priority" queue.
4230            self.partition_pending(caller_instance)?;
4231        }
4232
4233        Ok(())
4234    }
4235
4236    /// Implements the `context.get` intrinsic.
4237    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4238        let task = self.guest_task.unwrap();
4239        let val = self.get(task)?.context[usize::try_from(slot).unwrap()];
4240        log::trace!("context_get {task:?} slot {slot} val {val:#x}");
4241        Ok(val)
4242    }
4243
4244    /// Implements the `context.set` intrinsic.
4245    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4246        let task = self.guest_task.unwrap();
4247        log::trace!("context_set {task:?} slot {slot} val {val:#x}");
4248        self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
4249        Ok(())
4250    }
4251
4252    fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
4253        &self.component.env_component().options[options]
4254    }
4255}
4256
4257/// Provide a type hint to compiler about the shape of a parameter lower
4258/// closure.
4259fn for_any_lower<
4260    F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4261>(
4262    fun: F,
4263) -> F {
4264    fun
4265}
4266
4267/// Provide a type hint to compiler about the shape of a result lift closure.
4268fn for_any_lift<
4269    F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4270        + Send
4271        + Sync,
4272>(
4273    fun: F,
4274) -> F {
4275    fun
4276}
4277
4278/// Wrap the specified future in a `poll_fn` which asserts that the future is
4279/// only polled from the event loop of the specified `Instance`.
4280///
4281/// See `Instance::run_concurrent` for details.
4282fn checked<F: Future + Send + 'static>(
4283    instance: Instance,
4284    fut: F,
4285) -> impl Future<Output = F::Output> + Send + 'static {
4286    async move {
4287        let mut fut = pin!(fut);
4288        future::poll_fn(move |cx| {
4289            let message = "\
4290                `Future`s which depend on asynchronous component tasks, streams, or \
4291                futures to complete may only be polled from the event loop of the \
4292                instance from which they originated.  Please use \
4293                `Instance::{run_concurrent,spawn}` to poll or await them.\
4294            ";
4295            tls::try_get(|store| {
4296                let matched = match store {
4297                    tls::TryGet::Some(store) => {
4298                        let a = store.concurrent_async_state_mut().current_instance;
4299                        a == Some(instance.id().instance())
4300                    }
4301                    tls::TryGet::Taken | tls::TryGet::None => false,
4302                };
4303
4304                if !matched {
4305                    panic!("{message}")
4306                }
4307            });
4308            fut.as_mut().poll(cx)
4309        })
4310        .await
4311    }
4312}
4313
4314/// Assert that `Instance::run_concurrent` has not been called from within an
4315/// instance's event loop.
4316fn check_recursive_run() {
4317    tls::try_get(|store| {
4318        if !matches!(store, tls::TryGet::None) {
4319            panic!("Recursive `Instance::run_concurrent` calls not supported")
4320        }
4321    });
4322}
4323
4324fn unpack_callback_code(code: u32) -> (u32, u32) {
4325    (code & 0xF, code >> 4)
4326}
4327
4328/// Helper struct for packaging parameters to be passed to
4329/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4330/// `waitable-set.poll`.
4331struct WaitableCheckParams {
4332    set: TableId<WaitableSet>,
4333    options: OptionsIndex,
4334    payload: u32,
4335}
4336
4337/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4338enum WaitableCheck {
4339    Wait(WaitableCheckParams),
4340    Poll(WaitableCheckParams),
4341    Yield,
4342}
4343
4344/// Represents a guest task called from the host, prepared using `prepare_call`.
4345pub(crate) struct PreparedCall<R> {
4346    /// The guest export to be called
4347    handle: Func,
4348    /// The guest task created by `prepare_call`
4349    task: TableId<GuestTask>,
4350    /// The number of lowered core Wasm parameters to pass to the call.
4351    param_count: usize,
4352    /// The `oneshot::Receiver` to which the result of the call will be
4353    /// delivered when it is available.
4354    rx: oneshot::Receiver<LiftedResult>,
4355    _phantom: PhantomData<R>,
4356}
4357
4358impl<R> PreparedCall<R> {
4359    /// Get a copy of the `TaskId` for this `PreparedCall`.
4360    pub(crate) fn task_id(&self) -> TaskId {
4361        TaskId {
4362            handle: self.handle,
4363            task: self.task,
4364        }
4365    }
4366}
4367
4368/// Represents a task created by `prepare_call`.
4369pub(crate) struct TaskId {
4370    handle: Func,
4371    task: TableId<GuestTask>,
4372}
4373
4374impl TaskId {
4375    /// Remove the specified task from the concurrent state to which it belongs.
4376    ///
4377    /// This must be used with care to avoid use-after-delete or double-delete
4378    /// bugs.  Specifically, it should only be called on tasks created with the
4379    /// `remove_task_automatically` parameter to `prepare_call` set to `false`,
4380    /// which tells the runtime that the caller is responsible for removing the
4381    /// task from the state; otherwise, it will be removed automatically.  Also,
4382    /// it should only be called once for a given task, and only after either
4383    /// the task has completed or the instance has trapped.
4384    pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4385        Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
4386    }
4387}
4388
4389/// Prepare a call to the specified exported Wasm function, providing functions
4390/// for lowering the parameters and lifting the result.
4391///
4392/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4393/// loop, use `queue_call`.
4394pub(crate) fn prepare_call<T, R>(
4395    mut store: StoreContextMut<T>,
4396    handle: Func,
4397    param_count: usize,
4398    remove_task_automatically: bool,
4399    call_post_return_automatically: bool,
4400    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4401    + Send
4402    + Sync
4403    + 'static,
4404    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4405    + Send
4406    + Sync
4407    + 'static,
4408) -> Result<PreparedCall<R>> {
4409    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4410
4411    let instance = handle.instance().id().get(store.0);
4412    let task_return_type = instance.component().types()[ty].results;
4413    let component_instance = raw_options.instance;
4414    let callback = options.callback();
4415    let memory = options.memory_raw().map(SendSyncPtr::new);
4416    let string_encoding = options.string_encoding();
4417    let token = StoreToken::new(store.as_context_mut());
4418    let state = handle.instance().concurrent_state_mut(store.0);
4419
4420    assert!(state.guest_task.is_none());
4421
4422    let (tx, rx) = oneshot::channel();
4423
4424    let mut task = GuestTask::new(
4425        state,
4426        Box::new(for_any_lower(move |store, instance, params| {
4427            debug_assert!(instance.id() == handle.instance().id());
4428            lower_params(handle, token.as_context_mut(store), params)
4429        })),
4430        LiftResult {
4431            lift: Box::new(for_any_lift(move |store, instance, result| {
4432                debug_assert!(instance.id() == handle.instance().id());
4433                lift_result(handle, store, result)
4434            })),
4435            ty: task_return_type,
4436            memory,
4437            string_encoding,
4438        },
4439        Caller::Host {
4440            tx: Some(tx),
4441            remove_task_automatically,
4442            call_post_return_automatically,
4443        },
4444        callback.map(|callback| {
4445            let callback = SendSyncPtr::new(callback);
4446            Box::new(
4447                move |store: &mut dyn VMStore,
4448                      instance: Instance,
4449                      runtime_instance,
4450                      event,
4451                      handle| {
4452                    let store = token.as_context_mut(store);
4453                    // SAFETY: Per the contract of `prepare_call`, the callback
4454                    // will remain valid at least as long is this task exists.
4455                    unsafe {
4456                        instance.call_callback(
4457                            store,
4458                            runtime_instance,
4459                            callback,
4460                            event,
4461                            handle,
4462                            call_post_return_automatically,
4463                        )
4464                    }
4465                },
4466            ) as CallbackFn
4467        }),
4468        component_instance,
4469    )?;
4470    task.function_index = Some(handle.index());
4471
4472    let task = state.push(task)?;
4473
4474    Ok(PreparedCall {
4475        handle,
4476        task,
4477        param_count,
4478        rx,
4479        _phantom: PhantomData,
4480    })
4481}
4482
4483/// Queue a call previously prepared using `prepare_call` to be run as part of
4484/// the associated `ComponentInstance`'s event loop.
4485///
4486/// The returned future will resolve to the result once it is available, but
4487/// must only be polled via the instance's event loop. See
4488/// `Instance::run_concurrent` for details.
4489pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4490    mut store: StoreContextMut<T>,
4491    prepared: PreparedCall<R>,
4492) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
4493    let PreparedCall {
4494        handle,
4495        task,
4496        param_count,
4497        rx,
4498        ..
4499    } = prepared;
4500
4501    queue_call0(store.as_context_mut(), handle, task, param_count)?;
4502
4503    Ok(checked(
4504        handle.instance(),
4505        rx.map(|result| {
4506            result
4507                .map(|v| *v.downcast().unwrap())
4508                .map_err(anyhow::Error::from)
4509        }),
4510    ))
4511}
4512
4513/// Queue a call previously prepared using `prepare_call` to be run as part of
4514/// the associated `ComponentInstance`'s event loop.
4515fn queue_call0<T: 'static>(
4516    store: StoreContextMut<T>,
4517    handle: Func,
4518    guest_task: TableId<GuestTask>,
4519    param_count: usize,
4520) -> Result<()> {
4521    let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4522    let is_concurrent = raw_options.async_;
4523    let instance = handle.instance();
4524    let callee = handle.lifted_core_func(store.0);
4525    let callback = options.callback();
4526    let post_return = handle.post_return_core_func(store.0);
4527
4528    log::trace!("queueing call {guest_task:?}");
4529
4530    let instance_flags = if callback.is_none() {
4531        None
4532    } else {
4533        Some(flags)
4534    };
4535
4536    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
4537    // (with signatures appropriate for this call) and will remain valid as
4538    // long as this instance is valid.
4539    unsafe {
4540        instance.queue_call(
4541            store,
4542            guest_task,
4543            SendSyncPtr::new(callee),
4544            param_count,
4545            1,
4546            instance_flags,
4547            is_concurrent,
4548            callback.map(SendSyncPtr::new),
4549            post_return.map(SendSyncPtr::new),
4550        )
4551    }
4552}