wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `ComponentInstance` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `Instance::run_concurrent`.  The `ComponentInstance::poll_until`
14//! function contains the loop itself, while the
15//! `ComponentInstance::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `Instance::run_concurrent`: Run the event loop for the specified instance,
25//! allowing any and all tasks belonging to that instance to make progress.
26//!
27//! - `Instance::spawn`: Run a background task as part of the event loop for the
28//! specified instance.
29//!
30//! - `Instance::{future,stream}`: Create a new Component Model `future` or
31//! `stream`; the read end may be passed to the guest.
32//!
33//! - `{Future,Stream}Reader::read` and `{Future,Stream}Writer::write`: read
34//! from or write to a future or stream, respectively.
35//!
36//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
37//!
38//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
39//! function with the linker.  That function will take an `Accessor` as its
40//! first parameter, which provides access to the store and instance between
41//! (but not across) await points.
42//!
43//! - `Accessor::with`: Access the store, its associated data, and the current
44//! instance.
45//!
46//! - `Accessor::spawn`: Run a background task as part of the event loop for the
47//! specified instance.  This is equivalent to `Instance::spawn` but more
48//! convenient to use in host functions.
49
50use crate::component::func::{self, Func, Options};
51use crate::component::{
52    Component, ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable,
53    ResourceTableError,
54};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58    CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
87    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88    RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
89    TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
90    TypeTupleIndex,
91};
92
93pub use abort::JoinHandle;
94pub use futures_and_streams::{
95    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
98};
99pub(crate) use futures_and_streams::{
100    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
101};
102
103mod abort;
104mod error_contexts;
105mod futures_and_streams;
106mod table;
107pub(crate) mod tls;
108
109/// Constant defined in the Component Model spec to indicate that the async
110/// intrinsic (e.g. `future.write`) has not yet completed.
111const BLOCKED: u32 = 0xffff_ffff;
112
113/// Corresponds to `CallState` in the upstream spec.
114#[derive(Clone, Copy, Eq, PartialEq, Debug)]
115pub enum Status {
116    Starting = 0,
117    Started = 1,
118    Returned = 2,
119    StartCancelled = 3,
120    ReturnCancelled = 4,
121}
122
123impl Status {
124    /// Packs this status and the optional `waitable` provided into a 32-bit
125    /// result that the canonical ABI requires.
126    ///
127    /// The low 4 bits are reserved for the status while the upper 28 bits are
128    /// the waitable, if present.
129    pub fn pack(self, waitable: Option<u32>) -> u32 {
130        assert!(matches!(self, Status::Returned) == waitable.is_none());
131        let waitable = waitable.unwrap_or(0);
132        assert!(waitable < (1 << 28));
133        (waitable << 4) | (self as u32)
134    }
135}
136
137/// Corresponds to `EventCode` in the Component Model spec, plus related payload
138/// data.
139#[derive(Clone, Copy, Debug)]
140enum Event {
141    None,
142    Cancelled,
143    Subtask {
144        status: Status,
145    },
146    StreamRead {
147        code: ReturnCode,
148        pending: Option<(TypeStreamTableIndex, u32)>,
149    },
150    StreamWrite {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    FutureRead {
155        code: ReturnCode,
156        pending: Option<(TypeFutureTableIndex, u32)>,
157    },
158    FutureWrite {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162}
163
164impl Event {
165    /// Lower this event to core Wasm integers for delivery to the guest.
166    ///
167    /// Note that the waitable handle, if any, is assumed to be lowered
168    /// separately.
169    fn parts(self) -> (u32, u32) {
170        const EVENT_NONE: u32 = 0;
171        const EVENT_SUBTASK: u32 = 1;
172        const EVENT_STREAM_READ: u32 = 2;
173        const EVENT_STREAM_WRITE: u32 = 3;
174        const EVENT_FUTURE_READ: u32 = 4;
175        const EVENT_FUTURE_WRITE: u32 = 5;
176        const EVENT_CANCELLED: u32 = 6;
177        match self {
178            Event::None => (EVENT_NONE, 0),
179            Event::Cancelled => (EVENT_CANCELLED, 0),
180            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
181            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
182            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
183            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
184            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
185        }
186    }
187}
188
189/// Corresponds to `CallbackCode` in the spec.
190mod callback_code {
191    pub const EXIT: u32 = 0;
192    pub const YIELD: u32 = 1;
193    pub const WAIT: u32 = 2;
194    pub const POLL: u32 = 3;
195}
196
197/// A flag indicating that the callee is an async-lowered export.
198///
199/// This may be passed to the `async-start` intrinsic from a fused adapter.
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
201
202/// Provides access to either store data (via the `get` method) or the store
203/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
204/// instance to which the current host task belongs.
205///
206/// See [`Accessor::with`] for details.
207pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
208    store: StoreContextMut<'a, T>,
209    get_data: fn(&mut T) -> D::Data<'_>,
210    instance: Option<Instance>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215    D: HasData + ?Sized,
216    T: 'static,
217{
218    /// Creates a new [`Access`] from its component parts.
219    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220        Self {
221            store,
222            get_data,
223            instance: None,
224        }
225    }
226
227    /// Get mutable access to the store data.
228    pub fn data_mut(&mut self) -> &mut T {
229        self.store.data_mut()
230    }
231
232    /// Get mutable access to the store data.
233    pub fn get(&mut self) -> D::Data<'_> {
234        (self.get_data)(self.data_mut())
235    }
236
237    /// Spawn a background task.
238    ///
239    /// See [`Accessor::spawn`] for details.
240    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
241    where
242        T: 'static,
243    {
244        let accessor = Accessor {
245            get_data: self.get_data,
246            instance: self.instance,
247            token: StoreToken::new(self.store.as_context_mut()),
248        };
249        self.instance
250            .unwrap()
251            .spawn_with_accessor(self.store.as_context_mut(), accessor, task)
252    }
253
254    /// Retrieve the component instance of the caller.
255    pub fn instance(&self) -> Instance {
256        self.instance.unwrap()
257    }
258}
259
260impl<'a, T, D> AsContext for Access<'a, T, D>
261where
262    D: HasData + ?Sized,
263    T: 'static,
264{
265    type Data = T;
266
267    fn as_context(&self) -> StoreContext<'_, T> {
268        self.store.as_context()
269    }
270}
271
272impl<'a, T, D> AsContextMut for Access<'a, T, D>
273where
274    D: HasData + ?Sized,
275    T: 'static,
276{
277    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
278        self.store.as_context_mut()
279    }
280}
281
282/// Provides scoped mutable access to store data in the context of a concurrent
283/// host task future.
284///
285/// This allows multiple host task futures to execute concurrently and access
286/// the store between (but not across) `await` points.
287///
288/// # Rationale
289///
290/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
291/// `D::Data<'_>`. The problem this is solving, however, is that it does not
292/// literally store these values. The basic problem is that when a concurrent
293/// host future is being polled it has access to `&mut T` (and the whole
294/// `Store`) but when it's not being polled it does not have access to these
295/// values. This reflects how the store is only ever polling one future at a
296/// time so the store is effectively being passed between futures.
297///
298/// Rust's `Future` trait, however, has no means of passing a `Store`
299/// temporarily between futures. The [`Context`](std::task::Context) type does
300/// not have the ability to attach arbitrary information to it at this time.
301/// This type, [`Accessor`], is used to bridge this expressivity gap.
302///
303/// The [`Accessor`] type here represents the ability to acquire, temporarily in
304/// a synchronous manner, the current store. The [`Accessor::with`] function
305/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
306/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
307/// not take an `async` closure as its argument, instead it's a synchronous
308/// closure which must complete during on run of `Future::poll`. This reflects
309/// how the store is temporarily made available while a host future is being
310/// polled.
311///
312/// # Implementation
313///
314/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
315/// this type additionally doesn't even have a lifetime parameter. This is
316/// instead a representation of proof of the ability to acquire these while a
317/// future is being polled. Wasmtime will, when it polls a host future,
318/// configure ambient state such that the `Accessor` that a future closes over
319/// will work and be able to access the store.
320///
321/// This has a number of implications for users such as:
322///
323/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
324///   the lifetime of a single future.
325/// * A future is expected to, however, close over an `Accessor` and keep it
326///   alive probably for the duration of the entire future.
327/// * Different host futures will be given different `Accessor`s, and that's
328///   intentional.
329/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
330///   alleviates some otherwise required bounds to be written down.
331///
332/// # Using `Accessor` in `Drop`
333///
334/// The methods on `Accessor` are only expected to work in the context of
335/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
336/// host future can be dropped at any time throughout the system and Wasmtime
337/// store context is not necessarily available at that time. It's recommended to
338/// not use `Accessor` methods in anything connected to a `Drop` implementation
339/// as they will panic and have unintended results. If you run into this though
340/// feel free to file an issue on the Wasmtime repository.
341pub struct Accessor<T: 'static, D = HasSelf<T>>
342where
343    D: HasData + ?Sized,
344{
345    token: StoreToken<T>,
346    get_data: fn(&mut T) -> D::Data<'_>,
347    instance: Option<Instance>,
348}
349
350/// A helper trait to take any type of accessor-with-data in functions.
351///
352/// This trait is similar to [`AsContextMut`] except that it's used when
353/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
354/// [`Accessor`] is the main type used in concurrent settings and is passed to
355/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
356///
357/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
358/// this trait. This effectively means that regardless of the `D` in
359/// `Accessor<T, D>` it can still be passed to a function which just needs a
360/// store accessor.
361///
362/// Acquiring an [`Accessor`] can be done through [`Instance::run_concurrent`]
363/// for example or in a host function through
364/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
365pub trait AsAccessor {
366    /// The `T` in `Store<T>` that this accessor refers to.
367    type Data: 'static;
368
369    /// The `D` in `Accessor<T, D>`, or the projection out of
370    /// `Self::Data`.
371    type AccessorData: HasData + ?Sized;
372
373    /// Returns the accessor that this is referring to.
374    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
375}
376
377impl<T: AsAccessor + ?Sized> AsAccessor for &T {
378    type Data = T::Data;
379    type AccessorData = T::AccessorData;
380
381    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
382        T::as_accessor(self)
383    }
384}
385
386impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
387    type Data = T;
388    type AccessorData = D;
389
390    fn as_accessor(&self) -> &Accessor<T, D> {
391        self
392    }
393}
394
395// Note that it is intentional at this time that `Accessor` does not actually
396// store `&mut T` or anything similar. This distinctly enables the `Accessor`
397// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
398// that matter). This is used to ergonomically simplify bindings where the
399// majority of the time `Accessor` is closed over in a future which then needs
400// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
401// you already have to write `T: 'static`...) it helps to avoid this.
402//
403// Note as well that `Accessor` doesn't actually store its data at all. Instead
404// it's more of a "proof" of what can be accessed from TLS. API design around
405// `Accessor` and functions like `Linker::func_wrap_concurrent` are
406// intentionally made to ensure that `Accessor` is ideally only used in the
407// context that TLS variables are actually set. For example host functions are
408// given `&Accessor`, not `Accessor`, and this prevents them from persisting
409// the value outside of a future. Within the future the TLS variables are all
410// guaranteed to be set while the future is being polled.
411//
412// Finally though this is not an ironclad guarantee, but nor does it need to be.
413// The TLS APIs are designed to panic or otherwise model usage where they're
414// called recursively or similar. It's hoped that code cannot be constructed to
415// actually hit this at runtime but this is not a safety requirement at this
416// time.
417const _: () = {
418    const fn assert<T: Send + Sync>() {}
419    assert::<Accessor<UnsafeCell<u32>>>();
420};
421
422impl<T> Accessor<T> {
423    /// Creates a new `Accessor` backed by the specified functions.
424    ///
425    /// - `get`: used to retrieve the store
426    ///
427    /// - `get_data`: used to "project" from the store's associated data to
428    /// another type (e.g. a field of that data or a wrapper around it).
429    ///
430    /// - `spawn`: used to queue spawned background tasks to be run later
431    ///
432    /// - `instance`: used to access the `Instance` to which this `Accessor`
433    /// (and the future which closes over it) belongs
434    pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
435        Self {
436            token,
437            get_data: |x| x,
438            instance,
439        }
440    }
441}
442
443impl<T, D> Accessor<T, D>
444where
445    D: HasData + ?Sized,
446{
447    /// Run the specified closure, passing it mutable access to the store.
448    ///
449    /// This function is one of the main building blocks of the [`Accessor`]
450    /// type. This yields synchronous, blocking, access to store via an
451    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
452    /// providing the ability to access `D` via [`Access::get`]. Note that the
453    /// `fun` here is given only temporary access to the store and `T`/`D`
454    /// meaning that the return value `R` here is not allowed to capture borrows
455    /// into the two. If access is needed to data within `T` or `D` outside of
456    /// this closure then it must be `clone`d out, for example.
457    ///
458    /// # Panics
459    ///
460    /// This function will panic if it is call recursively with any other
461    /// accessor already in scope. For example if `with` is called within `fun`,
462    /// then this function will panic. It is up to the embedder to ensure that
463    /// this does not happen.
464    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465        tls::get(|vmstore| {
466            fun(Access {
467                store: self.token.as_context_mut(vmstore),
468                get_data: self.get_data,
469                instance: self.instance,
470            })
471        })
472    }
473
474    /// Returns the getter this accessor is using to project from `T` into
475    /// `D::Data`.
476    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
477        self.get_data
478    }
479
480    /// Changes this accessor to access `D2` instead of the current type
481    /// parameter `D`.
482    ///
483    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
484    ///
485    /// # Panics
486    ///
487    /// When using this API the returned value is disconnected from `&self` and
488    /// the lifetime binding the `self` argument. An `Accessor` only works
489    /// within the context of the closure or async closure that it was
490    /// originally given to, however. This means that due to the fact that the
491    /// returned value has no lifetime connection it's possible to use the
492    /// accessor outside of `&self`, the original accessor, and panic.
493    ///
494    /// The returned value should only be used within the scope of the original
495    /// `Accessor` that `self` refers to.
496    pub fn with_getter<D2: HasData>(
497        &self,
498        get_data: fn(&mut T) -> D2::Data<'_>,
499    ) -> Accessor<T, D2> {
500        Accessor {
501            token: self.token,
502            get_data,
503            instance: self.instance,
504        }
505    }
506
507    /// Spawn a background task which will receive an `&Accessor<T, D>` and
508    /// run concurrently with any other tasks in progress for the current
509    /// instance.
510    ///
511    /// This is particularly useful for host functions which return a `stream`
512    /// or `future` such that the code to write to the write end of that
513    /// `stream` or `future` must run after the function returns.
514    ///
515    /// The returned [`JoinHandle`] may be used to cancel the task.
516    ///
517    /// # Panics
518    ///
519    /// Panics if called within a closure provided to the [`Accessor::with`]
520    /// function. This can only be called outside an active invocation of
521    /// [`Accessor::with`].
522    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
523    where
524        T: 'static,
525    {
526        let instance = self.instance.unwrap();
527        let accessor = self.clone_for_spawn();
528        self.with(|mut access| {
529            instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
530        })
531    }
532
533    /// Retrieve the component instance of the caller.
534    pub fn instance(&self) -> Instance {
535        self.instance.unwrap()
536    }
537
538    fn clone_for_spawn(&self) -> Self {
539        Self {
540            token: self.token,
541            get_data: self.get_data,
542            instance: self.instance,
543        }
544    }
545}
546
547/// Represents a task which may be provided to `Accessor::spawn`,
548/// `Accessor::forward`, or `Instance::spawn`.
549// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
550// option.
551//
552// `AsyncFnOnce` is still nightly-only in latest stable Rust version as of this
553// writing (1.84.1), and even with 1.85.0-beta it's not possible to specify
554// e.g. `Send` and `Sync` bounds on the `Future` type returned by an
555// `AsyncFnOnce`.  Also, using `F: Future<Output = Result<()>> + Send + Sync,
556// FN: FnOnce(&Accessor<T>) -> F + Send + Sync + 'static` fails with a type
557// mismatch error when we try to pass it an async closure (e.g. `async move |_|
558// { ... }`).  So this seems to be the best we can do for the time being.
559pub trait AccessorTask<T, D, R>: Send + 'static
560where
561    D: HasData + ?Sized,
562{
563    /// Run the task.
564    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
565}
566
567/// Represents parameter and result metadata for the caller side of a
568/// guest->guest call orchestrated by a fused adapter.
569enum CallerInfo {
570    /// Metadata for a call to an async-lowered import
571    Async {
572        params: Vec<ValRaw>,
573        has_result: bool,
574    },
575    /// Metadata for a call to an sync-lowered import
576    Sync {
577        params: Vec<ValRaw>,
578        result_count: u32,
579    },
580}
581
582/// Indicates how a guest task is waiting on a waitable set.
583enum WaitMode {
584    /// The guest task is waiting using `task.wait`
585    Fiber(StoreFiber<'static>),
586    /// The guest task is waiting via a callback declared as part of an
587    /// async-lifted export.
588    Callback,
589}
590
591/// Represents the reason a fiber is suspending itself.
592#[derive(Debug)]
593enum SuspendReason {
594    /// The fiber is waiting for an event to be delivered to the specified
595    /// waitable set or task.
596    Waiting {
597        set: TableId<WaitableSet>,
598        task: TableId<GuestTask>,
599    },
600    /// The fiber has finished handling its most recent work item and is waiting
601    /// for another (or to be dropped if it is no longer needed).
602    NeedWork,
603    /// The fiber is yielding and should be resumed once other tasks have had a
604    /// chance to run.
605    Yielding { task: TableId<GuestTask> },
606}
607
608/// Represents a pending call into guest code for a given guest task.
609enum GuestCallKind {
610    /// Indicates there's an event to deliver to the task, possibly related to a
611    /// waitable set the task has been waiting on or polling.
612    DeliverEvent {
613        /// The waitable set the event belongs to, if any.
614        ///
615        /// If this is `None` the event will be waiting in the
616        /// `GuestTask::event` field for the task.
617        set: Option<TableId<WaitableSet>>,
618    },
619    /// Indicates that a new guest task call is pending and may be executed
620    /// using the specified closure.
621    Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
622}
623
624impl fmt::Debug for GuestCallKind {
625    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
626        match self {
627            Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(),
628            Self::Start(_) => f.debug_tuple("Start").finish(),
629        }
630    }
631}
632
633/// Represents a pending call into guest code for a given guest task.
634#[derive(Debug)]
635struct GuestCall {
636    task: TableId<GuestTask>,
637    kind: GuestCallKind,
638}
639
640impl GuestCall {
641    /// Returns whether or not the call is ready to run.
642    ///
643    /// A call will not be ready to run if either:
644    ///
645    /// - the (sub-)component instance to be called has already been entered and
646    /// cannot be reentered until an in-progress call completes
647    ///
648    /// - the call is for a not-yet started task and the (sub-)component
649    /// instance to be called has backpressure enabled
650    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
651        let task_instance = state.get_mut(self.task)?.instance;
652        let state = state.instance_state(task_instance);
653        let ready = match &self.kind {
654            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
655            GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0),
656        };
657        log::trace!(
658            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
659            state.do_not_enter,
660            state.backpressure
661        );
662        Ok(ready)
663    }
664}
665
666/// Job to be run on a worker fiber.
667enum WorkerItem {
668    GuestCall(GuestCall),
669    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
670}
671
672/// Represents state related to an in-progress poll operation (e.g. `task.poll`
673/// or `CallbackCode.POLL`).
674#[derive(Debug)]
675struct PollParams {
676    /// Identifies the polling task.
677    task: TableId<GuestTask>,
678    /// The waitable set being polled.
679    set: TableId<WaitableSet>,
680}
681
682/// Represents a pending work item to be handled by the event loop for a given
683/// component instance.
684enum WorkItem {
685    /// A host task to be pushed to `ConcurrentState::futures`.
686    PushFuture(AlwaysMut<HostTaskFuture>),
687    /// A fiber to resume.
688    ResumeFiber(StoreFiber<'static>),
689    /// A pending call into guest code for a given guest task.
690    GuestCall(GuestCall),
691    /// A pending `task.poll` or `CallbackCode.POLL` operation.
692    Poll(PollParams),
693    /// A job to run on a worker fiber.
694    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
695}
696
697impl fmt::Debug for WorkItem {
698    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
699        match self {
700            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
701            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
702            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
703            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
704            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
705        }
706    }
707}
708
709impl ComponentInstance {
710    /// Handle the `CallbackCode` returned from an async-lifted export or its
711    /// callback.
712    ///
713    /// If `initial_call` is `true`, then the code was received from the
714    /// async-lifted export; otherwise, it was received from its callback.
715    fn handle_callback_code(
716        mut self: Pin<&mut Self>,
717        guest_task: TableId<GuestTask>,
718        runtime_instance: RuntimeComponentInstanceIndex,
719        code: u32,
720        initial_call: bool,
721    ) -> Result<()> {
722        let (code, set) = unpack_callback_code(code);
723
724        log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
725
726        let state = self.as_mut().concurrent_state_mut();
727        let task = state.get_mut(guest_task)?;
728
729        if task.lift_result.is_some() {
730            if code == callback_code::EXIT {
731                return Err(anyhow!(crate::Trap::NoAsyncResult));
732            }
733            if initial_call {
734                // Notify any current or future waiters that this subtask has
735                // started.
736                Waitable::Guest(guest_task).set_event(
737                    state,
738                    Some(Event::Subtask {
739                        status: Status::Started,
740                    }),
741                )?;
742            }
743        }
744
745        let get_set = |instance: Pin<&mut Self>, handle| {
746            if handle == 0 {
747                bail!("invalid waitable-set handle");
748            }
749
750            let set = instance.guest_tables().0[runtime_instance].waitable_set_rep(handle)?;
751
752            Ok(TableId::<WaitableSet>::new(set))
753        };
754
755        match code {
756            callback_code::EXIT => {
757                let task = state.get_mut(guest_task)?;
758                match &task.caller {
759                    Caller::Host {
760                        remove_task_automatically,
761                        ..
762                    } => {
763                        if *remove_task_automatically {
764                            log::trace!("handle_callback_code will delete task {guest_task:?}");
765                            Waitable::Guest(guest_task).delete_from(state)?;
766                        }
767                    }
768                    Caller::Guest { .. } => {
769                        task.exited = true;
770                        task.callback = None;
771                    }
772                }
773            }
774            callback_code::YIELD => {
775                // Push this task onto the "low priority" queue so it runs after
776                // any other tasks have had a chance to run.
777                let task = state.get_mut(guest_task)?;
778                assert!(task.event.is_none());
779                task.event = Some(Event::None);
780                state.push_low_priority(WorkItem::GuestCall(GuestCall {
781                    task: guest_task,
782                    kind: GuestCallKind::DeliverEvent { set: None },
783                }));
784            }
785            callback_code::WAIT | callback_code::POLL => {
786                let set = get_set(self.as_mut(), set)?;
787                let state = self.concurrent_state_mut();
788
789                if state.get_mut(guest_task)?.event.is_some()
790                    || !state.get_mut(set)?.ready.is_empty()
791                {
792                    // An event is immediately available; deliver it ASAP.
793                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
794                        task: guest_task,
795                        kind: GuestCallKind::DeliverEvent { set: Some(set) },
796                    }));
797                } else {
798                    // No event is immediately available.
799                    match code {
800                        callback_code::POLL => {
801                            // We're polling, so just yield and check whether an
802                            // event has arrived after that.
803                            state.push_low_priority(WorkItem::Poll(PollParams {
804                                task: guest_task,
805                                set,
806                            }));
807                        }
808                        callback_code::WAIT => {
809                            // We're waiting, so register to be woken up when an
810                            // event is published for this waitable set.
811                            //
812                            // Here we also set `GuestTask::wake_on_cancel`
813                            // which allows `subtask.cancel` to interrupt the
814                            // wait.
815                            let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
816                            assert!(old.is_none());
817                            let old = state
818                                .get_mut(set)?
819                                .waiting
820                                .insert(guest_task, WaitMode::Callback);
821                            assert!(old.is_none());
822                        }
823                        _ => unreachable!(),
824                    }
825                }
826            }
827            _ => bail!("unsupported callback code: {code}"),
828        }
829
830        Ok(())
831    }
832
833    /// Get the next pending event for the specified task and (optional)
834    /// waitable set, along with the waitable handle if applicable.
835    fn get_event(
836        mut self: Pin<&mut Self>,
837        guest_task: TableId<GuestTask>,
838        set: Option<TableId<WaitableSet>>,
839        cancellable: bool,
840    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
841        let state = self.as_mut().concurrent_state_mut();
842
843        if let Some(event) = state.get_mut(guest_task)?.event.take() {
844            log::trace!("deliver event {event:?} to {guest_task:?}");
845
846            if cancellable || !matches!(event, Event::Cancelled) {
847                return Ok(Some((event, None)));
848            } else {
849                state.get_mut(guest_task)?.event = Some(event);
850            }
851        }
852
853        Ok(
854            if let Some((set, waitable)) = set
855                .and_then(|set| {
856                    state
857                        .get_mut(set)
858                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
859                        .transpose()
860                })
861                .transpose()?
862            {
863                let common = waitable.common(state)?;
864                let handle = common.handle.unwrap();
865                let event = common.event.take().unwrap();
866
867                log::trace!(
868                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
869                );
870
871                waitable.on_delivery(self, event);
872
873                Some((event, Some((waitable, handle))))
874            } else {
875                None
876            },
877        )
878    }
879
880    /// Implements the `waitable-set.new` intrinsic.
881    pub(crate) fn waitable_set_new(
882        mut self: Pin<&mut Self>,
883        caller_instance: RuntimeComponentInstanceIndex,
884    ) -> Result<u32> {
885        self.check_may_leave(caller_instance)?;
886        let set = self
887            .as_mut()
888            .concurrent_state_mut()
889            .push(WaitableSet::default())?;
890        let handle = self.guest_tables().0[caller_instance].waitable_set_insert(set.rep())?;
891        log::trace!("new waitable set {set:?} (handle {handle})");
892        Ok(handle)
893    }
894
895    /// Implements the `waitable-set.drop` intrinsic.
896    pub(crate) fn waitable_set_drop(
897        mut self: Pin<&mut Self>,
898        caller_instance: RuntimeComponentInstanceIndex,
899        set: u32,
900    ) -> Result<()> {
901        self.check_may_leave(caller_instance)?;
902        let rep = self.as_mut().guest_tables().0[caller_instance].waitable_set_remove(set)?;
903
904        log::trace!("drop waitable set {rep} (handle {set})");
905
906        let set = self
907            .concurrent_state_mut()
908            .delete(TableId::<WaitableSet>::new(rep))?;
909
910        if !set.waiting.is_empty() {
911            bail!("cannot drop waitable set with waiters");
912        }
913
914        Ok(())
915    }
916
917    /// Implements the `waitable.join` intrinsic.
918    pub(crate) fn waitable_join(
919        mut self: Pin<&mut Self>,
920        caller_instance: RuntimeComponentInstanceIndex,
921        waitable_handle: u32,
922        set_handle: u32,
923    ) -> Result<()> {
924        self.check_may_leave(caller_instance)?;
925        let waitable = Waitable::from_instance(self.as_mut(), caller_instance, waitable_handle)?;
926
927        let set = if set_handle == 0 {
928            None
929        } else {
930            let set =
931                self.as_mut().guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
932
933            Some(TableId::<WaitableSet>::new(set))
934        };
935
936        log::trace!(
937            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
938        );
939
940        waitable.join(self.concurrent_state_mut(), set)
941    }
942
943    /// Implements the `subtask.drop` intrinsic.
944    pub(crate) fn subtask_drop(
945        mut self: Pin<&mut Self>,
946        caller_instance: RuntimeComponentInstanceIndex,
947        task_id: u32,
948    ) -> Result<()> {
949        self.check_may_leave(caller_instance)?;
950        self.as_mut().waitable_join(caller_instance, task_id, 0)?;
951
952        let (rep, is_host) =
953            self.as_mut().guest_tables().0[caller_instance].subtask_remove(task_id)?;
954
955        let concurrent_state = self.concurrent_state_mut();
956        let (waitable, expected_caller_instance, delete) = if is_host {
957            let id = TableId::<HostTask>::new(rep);
958            let task = concurrent_state.get_mut(id)?;
959            if task.join_handle.is_some() {
960                bail!("cannot drop a subtask which has not yet resolved");
961            }
962            (Waitable::Host(id), task.caller_instance, true)
963        } else {
964            let id = TableId::<GuestTask>::new(rep);
965            let task = concurrent_state.get_mut(id)?;
966            if task.lift_result.is_some() {
967                bail!("cannot drop a subtask which has not yet resolved");
968            }
969            if let Caller::Guest { instance, .. } = &task.caller {
970                (Waitable::Guest(id), *instance, task.exited)
971            } else {
972                unreachable!()
973            }
974        };
975
976        waitable.common(concurrent_state)?.handle = None;
977
978        if waitable.take_event(concurrent_state)?.is_some() {
979            bail!("cannot drop a subtask with an undelivered event");
980        }
981
982        if delete {
983            waitable.delete_from(concurrent_state)?;
984        }
985
986        // Since waitables can neither be passed between instances nor forged,
987        // this should never fail unless there's a bug in Wasmtime, but we check
988        // here to be sure:
989        assert_eq!(expected_caller_instance, caller_instance);
990        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
991        Ok(())
992    }
993}
994
995impl Instance {
996    /// Assert that all the relevant tables and queues in the concurrent state
997    /// for this instance are empty.
998    ///
999    /// This is for sanity checking in integration tests
1000    /// (e.g. `component-async-tests`) that the relevant state has been cleared
1001    /// after each test concludes.  This should help us catch leaks, e.g. guest
1002    /// tasks which haven't been deleted despite having completed and having
1003    /// been dropped by their supertasks.
1004    #[doc(hidden)]
1005    pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
1006        let mut instance = self.id().get_mut(store.as_context_mut().0);
1007        assert!(
1008            instance
1009                .as_mut()
1010                .guest_tables()
1011                .0
1012                .iter()
1013                .all(|(_, table)| table.is_empty())
1014        );
1015        let state = instance.concurrent_state_mut();
1016        assert!(
1017            state.table.get_mut().is_empty(),
1018            "non-empty table: {:?}",
1019            state.table.get_mut()
1020        );
1021        assert!(state.high_priority.is_empty());
1022        assert!(state.low_priority.is_empty());
1023        assert!(state.guest_task.is_none());
1024        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
1025        assert!(
1026            state
1027                .instance_states
1028                .iter()
1029                .all(|(_, state)| state.pending.is_empty())
1030        );
1031        assert!(state.global_error_context_ref_counts.is_empty());
1032    }
1033
1034    /// Run the specified closure `fun` to completion as part of this instance's
1035    /// event loop.
1036    ///
1037    /// Like [`Self::run`], this will run `fun` as part of this instance's event
1038    /// loop until it yields a result _or_ there are no more tasks to run.
1039    /// Unlike [`Self::run`], `fun` is provided an [`Accessor`], which provides
1040    /// controlled access to the `Store` and its data.
1041    ///
1042    /// This function can be used to invoke [`Func::call_concurrent`] for
1043    /// example within the async closure provided here.
1044    ///
1045    /// # Example
1046    ///
1047    /// ```
1048    /// # use {
1049    /// #   anyhow::{Result},
1050    /// #   wasmtime::{
1051    /// #     component::{ Component, Linker, Resource, ResourceTable},
1052    /// #     Config, Engine, Store
1053    /// #   },
1054    /// # };
1055    /// #
1056    /// # struct MyResource(u32);
1057    /// # struct Ctx { table: ResourceTable }
1058    /// #
1059    /// # async fn foo() -> Result<()> {
1060    /// # let mut config = Config::new();
1061    /// # let engine = Engine::new(&config)?;
1062    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1063    /// # let mut linker = Linker::new(&engine);
1064    /// # let component = Component::new(&engine, "")?;
1065    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1066    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1067    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1068    /// instance.run_concurrent(&mut store, async |accessor| -> wasmtime::Result<_> {
1069    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1070    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1071    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1072    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1073    ///    Ok(())
1074    /// }).await??;
1075    /// # Ok(())
1076    /// # }
1077    /// ```
1078    pub async fn run_concurrent<T, R>(
1079        self,
1080        mut store: impl AsContextMut<Data = T>,
1081        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1082    ) -> Result<R>
1083    where
1084        T: Send + 'static,
1085    {
1086        check_recursive_run();
1087        let mut store = store.as_context_mut();
1088        let token = StoreToken::new(store.as_context_mut());
1089
1090        struct Dropper<'a, T: 'static, V> {
1091            store: StoreContextMut<'a, T>,
1092            value: ManuallyDrop<V>,
1093        }
1094
1095        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1096            fn drop(&mut self) {
1097                tls::set(self.store.0, || {
1098                    // SAFETY: Here we drop the value without moving it for the
1099                    // first and only time -- per the contract for `Drop::drop`,
1100                    // this code won't run again, and the `value` field will no
1101                    // longer be accessible.
1102                    unsafe { ManuallyDrop::drop(&mut self.value) }
1103                });
1104            }
1105        }
1106
1107        let accessor = &Accessor::new(token, Some(self));
1108        let dropper = &mut Dropper {
1109            store,
1110            value: ManuallyDrop::new(fun(accessor)),
1111        };
1112        // SAFETY: We never move `dropper` nor its `value` field.
1113        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1114
1115        self.poll_until(dropper.store.as_context_mut(), future)
1116            .await
1117    }
1118
1119    /// Spawn a background task to run as part of this instance's event loop.
1120    ///
1121    /// The task will receive an `&Accessor<U>` and run concurrently with
1122    /// any other tasks in progress for the instance.
1123    ///
1124    /// Note that the task will only make progress if and when the event loop
1125    /// for this instance is run.
1126    ///
1127    /// The returned [`SpawnHandle`] may be used to cancel the task.
1128    pub fn spawn<U: 'static>(
1129        self,
1130        mut store: impl AsContextMut<Data = U>,
1131        task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
1132    ) -> JoinHandle {
1133        let mut store = store.as_context_mut();
1134        let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
1135        self.spawn_with_accessor(store, accessor, task)
1136    }
1137
1138    /// Internal implementation of `spawn` functions where a `store` is
1139    /// available along with an `Accessor`.
1140    fn spawn_with_accessor<T, D>(
1141        self,
1142        mut store: StoreContextMut<T>,
1143        accessor: Accessor<T, D>,
1144        task: impl AccessorTask<T, D, Result<()>>,
1145    ) -> JoinHandle
1146    where
1147        T: 'static,
1148        D: HasData + ?Sized,
1149    {
1150        let store = store.as_context_mut();
1151
1152        // Create an "abortable future" here where internally the future will
1153        // hook calls to poll and possibly spawn more background tasks on each
1154        // iteration.
1155        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1156        self.concurrent_state_mut(store.0)
1157            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1158
1159        handle
1160    }
1161
1162    /// Run this instance's event loop.
1163    ///
1164    /// The returned future will resolve when either the specified future
1165    /// completes (in which case we return its result) or no further progress
1166    /// can be made (in which case we trap with `Trap::AsyncDeadlock`).
1167    async fn poll_until<T, R>(
1168        self,
1169        mut store: StoreContextMut<'_, T>,
1170        mut future: Pin<&mut impl Future<Output = R>>,
1171    ) -> Result<R>
1172    where
1173        T: Send,
1174    {
1175        loop {
1176            // Take `ConcurrentState::futures` out of the instance so we can
1177            // poll it while also safely giving any of the futures inside access
1178            // to `self`.
1179            let mut futures = self
1180                .concurrent_state_mut(store.0)
1181                .futures
1182                .get_mut()
1183                .take()
1184                .unwrap();
1185            let mut next = pin!(futures.next());
1186
1187            let result = future::poll_fn(|cx| {
1188                // First, poll the future we were passed as an argument and
1189                // return immediately if it's ready.
1190                if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
1191                    return Poll::Ready(Ok(Either::Left(value)));
1192                }
1193
1194                // Next, poll `ConcurrentState::futures` (which includes any
1195                // pending host tasks and/or background tasks), returning
1196                // immediately if one of them fails.
1197                let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
1198                    Poll::Ready(Some(output)) => {
1199                        match output {
1200                            Err(e) => return Poll::Ready(Err(e)),
1201                            Ok(()) => {}
1202                        }
1203                        Poll::Ready(true)
1204                    }
1205                    Poll::Ready(None) => Poll::Ready(false),
1206                    Poll::Pending => Poll::Pending,
1207                };
1208
1209                let mut instance = self.id().get_mut(store.0);
1210
1211                // Next, check the "high priority" work queue and return
1212                // immediately if it has at least one item.
1213                let state = instance.as_mut().concurrent_state_mut();
1214                let ready = mem::take(&mut state.high_priority);
1215                let ready = if ready.is_empty() {
1216                    // Next, check the "low priority" work queue and return
1217                    // immediately if it has at least one item.
1218                    let ready = mem::take(&mut state.low_priority);
1219                    if ready.is_empty() {
1220                        return match next {
1221                            Poll::Ready(true) => {
1222                                // In this case, one of the futures in
1223                                // `ConcurrentState::futures` completed
1224                                // successfully, so we return now and continue
1225                                // the outer loop in case there is another one
1226                                // ready to complete.
1227                                Poll::Ready(Ok(Either::Right(Vec::new())))
1228                            }
1229                            Poll::Ready(false) => {
1230                                // Poll the future we were passed one last time
1231                                // in case one of `ConcurrentState::futures` had
1232                                // the side effect of unblocking it.
1233                                if let Poll::Ready(value) =
1234                                    self.set_tls(store.0, || future.as_mut().poll(cx))
1235                                {
1236                                    Poll::Ready(Ok(Either::Left(value)))
1237                                } else {
1238                                    // In this case, there are no more pending
1239                                    // futures in `ConcurrentState::futures`,
1240                                    // there are no remaining work items, _and_
1241                                    // the future we were passed as an argument
1242                                    // still hasn't completed, meaning we're
1243                                    // stuck, so we return an error.  The
1244                                    // underlying assumption is that `future`
1245                                    // depends on this component instance making
1246                                    // such progress, and thus there's no point
1247                                    // in continuing to poll it given we've run
1248                                    // out of work to do.
1249                                    //
1250                                    // Note that we'd also reach this point if
1251                                    // the host embedder passed e.g. a
1252                                    // `std::future::Pending` to
1253                                    // `Instance::run_concurrent`, in which case
1254                                    // we'd return a "deadlock" error even when
1255                                    // any and all tasks have completed
1256                                    // normally.  However, that's not how
1257                                    // `Instance::run_concurrent` is intended
1258                                    // (and documented) to be used, so it seems
1259                                    // reasonable to lump that case in with
1260                                    // "real" deadlocks.
1261                                    //
1262                                    // TODO: Once we've added host APIs for
1263                                    // cancelling in-progress tasks, we can
1264                                    // return some other, non-error value here,
1265                                    // treating it as "normal" and giving the
1266                                    // host embedder a chance to intervene by
1267                                    // cancelling one or more tasks and/or
1268                                    // starting new tasks capable of waking the
1269                                    // existing ones.
1270                                    Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1271                                }
1272                            }
1273                            // There is at least one pending future in
1274                            // `ConcurrentState::futures` and we have nothing
1275                            // else to do but wait for now, so we return
1276                            // `Pending`.
1277                            Poll::Pending => Poll::Pending,
1278                        };
1279                    } else {
1280                        ready
1281                    }
1282                } else {
1283                    ready
1284                };
1285
1286                Poll::Ready(Ok(Either::Right(ready)))
1287            })
1288            .await;
1289
1290            // Put the `ConcurrentState::futures` back into the instance before
1291            // we return or handle any work items since one or more of those
1292            // items might append more futures.
1293            *self.concurrent_state_mut(store.0).futures.get_mut() = Some(futures);
1294
1295            match result? {
1296                // The future we were passed as an argument completed, so we
1297                // return the result.
1298                Either::Left(value) => break Ok(value),
1299                // The future we were passed has not yet completed, so handle
1300                // any work items and then loop again.
1301                Either::Right(ready) => {
1302                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1303                        store: StoreContextMut<'a, T>,
1304                        ready: I,
1305                    }
1306
1307                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1308                        fn drop(&mut self) {
1309                            while let Some(item) = self.ready.next() {
1310                                match item {
1311                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1312                                    WorkItem::PushFuture(future) => {
1313                                        tls::set(self.store.0, move || drop(future))
1314                                    }
1315                                    _ => {}
1316                                }
1317                            }
1318                        }
1319                    }
1320
1321                    let mut dispose = Dispose {
1322                        store: store.as_context_mut(),
1323                        ready: ready.into_iter(),
1324                    };
1325
1326                    while let Some(item) = dispose.ready.next() {
1327                        self.handle_work_item(dispose.store.as_context_mut(), item)
1328                            .await?;
1329                    }
1330                }
1331            }
1332        }
1333    }
1334
1335    /// Handle the specified work item, possibly resuming a fiber if applicable.
1336    async fn handle_work_item<T: Send>(
1337        self,
1338        store: StoreContextMut<'_, T>,
1339        item: WorkItem,
1340    ) -> Result<()> {
1341        log::trace!("handle work item {item:?}");
1342        match item {
1343            WorkItem::PushFuture(future) => {
1344                self.concurrent_state_mut(store.0)
1345                    .futures
1346                    .get_mut()
1347                    .as_mut()
1348                    .unwrap()
1349                    .push(future.into_inner());
1350            }
1351            WorkItem::ResumeFiber(fiber) => {
1352                self.resume_fiber(store.0, fiber).await?;
1353            }
1354            WorkItem::GuestCall(call) => {
1355                let state = self.concurrent_state_mut(store.0);
1356                if call.is_ready(state)? {
1357                    self.run_on_worker(store, WorkerItem::GuestCall(call))
1358                        .await?;
1359                } else {
1360                    let task = state.get_mut(call.task)?;
1361                    if !task.starting_sent {
1362                        task.starting_sent = true;
1363                        if let GuestCallKind::Start(_) = &call.kind {
1364                            Waitable::Guest(call.task).set_event(
1365                                state,
1366                                Some(Event::Subtask {
1367                                    status: Status::Starting,
1368                                }),
1369                            )?;
1370                        }
1371                    }
1372
1373                    let runtime_instance = state.get_mut(call.task)?.instance;
1374                    state
1375                        .instance_state(runtime_instance)
1376                        .pending
1377                        .insert(call.task, call.kind);
1378                }
1379            }
1380            WorkItem::Poll(params) => {
1381                let state = self.concurrent_state_mut(store.0);
1382                if state.get_mut(params.task)?.event.is_some()
1383                    || !state.get_mut(params.set)?.ready.is_empty()
1384                {
1385                    // There's at least one event immediately available; deliver
1386                    // it to the guest ASAP.
1387                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1388                        task: params.task,
1389                        kind: GuestCallKind::DeliverEvent {
1390                            set: Some(params.set),
1391                        },
1392                    }));
1393                } else {
1394                    // There are no events immediately available; deliver
1395                    // `Event::None` to the guest.
1396                    state.get_mut(params.task)?.event = Some(Event::None);
1397                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1398                        task: params.task,
1399                        kind: GuestCallKind::DeliverEvent {
1400                            set: Some(params.set),
1401                        },
1402                    }));
1403                }
1404            }
1405            WorkItem::WorkerFunction(fun) => {
1406                self.run_on_worker(store, WorkerItem::Function(fun)).await?;
1407            }
1408        }
1409
1410        Ok(())
1411    }
1412
1413    /// Resume the specified fiber, giving it exclusive access to the specified
1414    /// store.
1415    async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
1416        let old_task = self.concurrent_state_mut(store).guest_task;
1417        log::trace!("resume_fiber: save current task {old_task:?}");
1418
1419        let fiber = fiber::resolve_or_release(store, fiber).await?;
1420
1421        let state = self.concurrent_state_mut(store);
1422
1423        state.guest_task = old_task;
1424        log::trace!("resume_fiber: restore current task {old_task:?}");
1425
1426        if let Some(mut fiber) = fiber {
1427            // See the `SuspendReason` documentation for what each case means.
1428            match state.suspend_reason.take().unwrap() {
1429                SuspendReason::NeedWork => {
1430                    if state.worker.is_none() {
1431                        state.worker = Some(fiber);
1432                    } else {
1433                        fiber.dispose(store);
1434                    }
1435                }
1436                SuspendReason::Yielding { .. } => {
1437                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1438                }
1439                SuspendReason::Waiting { set, task } => {
1440                    let old = state
1441                        .get_mut(set)?
1442                        .waiting
1443                        .insert(task, WaitMode::Fiber(fiber));
1444                    assert!(old.is_none());
1445                }
1446            }
1447        }
1448
1449        Ok(())
1450    }
1451
1452    /// Execute the specified guest call on a worker fiber.
1453    async fn run_on_worker<T: Send>(
1454        self,
1455        store: StoreContextMut<'_, T>,
1456        item: WorkerItem,
1457    ) -> Result<()> {
1458        let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() {
1459            fiber
1460        } else {
1461            fiber::make_fiber(store.0, move |store| {
1462                loop {
1463                    match self.concurrent_state_mut(store).worker_item.take().unwrap() {
1464                        WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
1465                        WorkerItem::Function(fun) => fun.into_inner()(store, self)?,
1466                    }
1467
1468                    self.suspend(store, SuspendReason::NeedWork)?;
1469                }
1470            })?
1471        };
1472
1473        let worker_item = &mut self.concurrent_state_mut(store.0).worker_item;
1474        assert!(worker_item.is_none());
1475        *worker_item = Some(item);
1476
1477        self.resume_fiber(store.0, worker).await
1478    }
1479
1480    /// Execute the specified guest call.
1481    fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
1482        match call.kind {
1483            GuestCallKind::DeliverEvent { set } => {
1484                let (event, waitable) = self
1485                    .id()
1486                    .get_mut(store)
1487                    .get_event(call.task, set, true)?
1488                    .unwrap();
1489                let state = self.concurrent_state_mut(store);
1490                let task = state.get_mut(call.task)?;
1491                let runtime_instance = task.instance;
1492                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
1493
1494                log::trace!(
1495                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
1496                    call.task,
1497                );
1498
1499                let old_task = state.guest_task.replace(call.task);
1500                log::trace!(
1501                    "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
1502                    call.task
1503                );
1504
1505                self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
1506
1507                let state = self.concurrent_state_mut(store);
1508                state.enter_instance(runtime_instance);
1509
1510                let callback = state.get_mut(call.task)?.callback.take().unwrap();
1511
1512                let code = callback(store, self, runtime_instance, event, handle)?;
1513
1514                let state = self.concurrent_state_mut(store);
1515
1516                state.get_mut(call.task)?.callback = Some(callback);
1517
1518                state.exit_instance(runtime_instance)?;
1519
1520                self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
1521
1522                self.id().get_mut(store).handle_callback_code(
1523                    call.task,
1524                    runtime_instance,
1525                    code,
1526                    false,
1527                )?;
1528
1529                self.concurrent_state_mut(store).guest_task = old_task;
1530                log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
1531            }
1532            GuestCallKind::Start(fun) => {
1533                fun(store, self)?;
1534            }
1535        }
1536
1537        Ok(())
1538    }
1539
1540    /// Suspend the current fiber, storing the reason in
1541    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1542    /// it should be resumed.
1543    ///
1544    /// See the `SuspendReason` documentation for details.
1545    fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
1546        log::trace!("suspend fiber: {reason:?}");
1547
1548        // If we're yielding or waiting on behalf of a guest task, we'll need to
1549        // pop the call context which manages resource borrows before suspending
1550        // and then push it again once we've resumed.
1551        let task = match &reason {
1552            SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1553            SuspendReason::NeedWork => None,
1554        };
1555
1556        let old_guest_task = if let Some(task) = task {
1557            self.maybe_pop_call_context(store, task)?;
1558            self.concurrent_state_mut(store).guest_task
1559        } else {
1560            None
1561        };
1562
1563        let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
1564        assert!(suspend_reason.is_none());
1565        *suspend_reason = Some(reason);
1566
1567        store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1568
1569        if let Some(task) = task {
1570            self.concurrent_state_mut(store).guest_task = old_guest_task;
1571            self.maybe_push_call_context(store, task)?;
1572        }
1573
1574        Ok(())
1575    }
1576
1577    /// Push the call context for managing resource borrows for the specified
1578    /// guest task if it has not yet either returned a result or cancelled
1579    /// itself.
1580    fn maybe_push_call_context(
1581        self,
1582        store: &mut StoreOpaque,
1583        guest_task: TableId<GuestTask>,
1584    ) -> Result<()> {
1585        let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
1586        if task.lift_result.is_some() {
1587            log::trace!("push call context for {guest_task:?}");
1588            let call_context = task.call_context.take().unwrap();
1589            store.component_resource_state().0.push(call_context);
1590        }
1591        Ok(())
1592    }
1593
1594    /// Pop the call context for managing resource borrows for the specified
1595    /// guest task if it has not yet either returned a result or cancelled
1596    /// itself.
1597    fn maybe_pop_call_context(
1598        self,
1599        store: &mut StoreOpaque,
1600        guest_task: TableId<GuestTask>,
1601    ) -> Result<()> {
1602        if self
1603            .concurrent_state_mut(store)
1604            .get_mut(guest_task)?
1605            .lift_result
1606            .is_some()
1607        {
1608            log::trace!("pop call context for {guest_task:?}");
1609            let call_context = Some(store.component_resource_state().0.pop().unwrap());
1610            self.concurrent_state_mut(store)
1611                .get_mut(guest_task)?
1612                .call_context = call_context;
1613        }
1614        Ok(())
1615    }
1616
1617    /// Add the specified guest call to the "high priority" work item queue, to
1618    /// be started as soon as backpressure and/or reentrance rules allow.
1619    ///
1620    /// SAFETY: The raw pointer arguments must be valid references to guest
1621    /// functions (with the appropriate signatures) when the closures queued by
1622    /// this function are called.
1623    unsafe fn queue_call<T: 'static>(
1624        self,
1625        mut store: StoreContextMut<T>,
1626        guest_task: TableId<GuestTask>,
1627        callee: SendSyncPtr<VMFuncRef>,
1628        param_count: usize,
1629        result_count: usize,
1630        flags: Option<InstanceFlags>,
1631        async_: bool,
1632        callback: Option<SendSyncPtr<VMFuncRef>>,
1633        post_return: Option<SendSyncPtr<VMFuncRef>>,
1634    ) -> Result<()> {
1635        /// Return a closure which will call the specified function in the scope
1636        /// of the specified task.
1637        ///
1638        /// This will use `GuestTask::lower_params` to lower the parameters, but
1639        /// will not lift the result; instead, it returns a
1640        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1641        /// any, may be lifted.  Note that an async-lifted export will have
1642        /// returned its result using the `task.return` intrinsic (or not
1643        /// returned a result at all, in the case of `task.cancel`), in which
1644        /// case the "result" of this call will either be a callback code or
1645        /// nothing.
1646        ///
1647        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1648        /// the returned closure is called.
1649        unsafe fn make_call<T: 'static>(
1650            store: StoreContextMut<T>,
1651            guest_task: TableId<GuestTask>,
1652            callee: SendSyncPtr<VMFuncRef>,
1653            param_count: usize,
1654            result_count: usize,
1655            flags: Option<InstanceFlags>,
1656        ) -> impl FnOnce(
1657            &mut dyn VMStore,
1658            Instance,
1659        ) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1660        + Send
1661        + Sync
1662        + 'static
1663        + use<T> {
1664            let token = StoreToken::new(store);
1665            move |store: &mut dyn VMStore, instance: Instance| {
1666                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1667                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1668                let may_enter_after_call = task.call_post_return_automatically();
1669                let lower = task.lower_params.take().unwrap();
1670
1671                lower(store, instance, &mut storage[..param_count])?;
1672
1673                let mut store = token.as_context_mut(store);
1674
1675                // SAFETY: Per the contract documented in `make_call's`
1676                // documentation, `callee` must be a valid pointer.
1677                unsafe {
1678                    if let Some(mut flags) = flags {
1679                        flags.set_may_enter(false);
1680                    }
1681                    crate::Func::call_unchecked_raw(
1682                        &mut store,
1683                        callee.as_non_null(),
1684                        NonNull::new(
1685                            &mut storage[..param_count.max(result_count)]
1686                                as *mut [MaybeUninit<ValRaw>] as _,
1687                        )
1688                        .unwrap(),
1689                    )?;
1690                    if let Some(mut flags) = flags {
1691                        flags.set_may_enter(may_enter_after_call);
1692                    }
1693                }
1694
1695                Ok(storage)
1696            }
1697        }
1698
1699        // SAFETY: Per the contract described in this function documentation,
1700        // the `callee` pointer which `call` closes over must be valid when
1701        // called by the closure we queue below.
1702        let call = unsafe {
1703            make_call(
1704                store.as_context_mut(),
1705                guest_task,
1706                callee,
1707                param_count,
1708                result_count,
1709                flags,
1710            )
1711        };
1712
1713        let callee_instance = self
1714            .concurrent_state_mut(store.0)
1715            .get_mut(guest_task)?
1716            .instance;
1717        let fun = if callback.is_some() {
1718            assert!(async_);
1719
1720            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1721                let old_task = instance
1722                    .concurrent_state_mut(store)
1723                    .guest_task
1724                    .replace(guest_task);
1725                log::trace!(
1726                    "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1727                );
1728
1729                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1730
1731                instance
1732                    .concurrent_state_mut(store)
1733                    .enter_instance(callee_instance);
1734
1735                // SAFETY: See the documentation for `make_call` to review the
1736                // contract we must uphold for `call` here.
1737                //
1738                // Per the contract described in the `queue_call`
1739                // documentation, the `callee` pointer which `call` closes
1740                // over must be valid.
1741                let storage = call(store, instance)?;
1742
1743                instance
1744                    .concurrent_state_mut(store)
1745                    .exit_instance(callee_instance)?;
1746
1747                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1748
1749                let state = instance.concurrent_state_mut(store);
1750                state.guest_task = old_task;
1751                log::trace!("stackless call: restored {old_task:?} as current task");
1752
1753                // SAFETY: `wasmparser` will have validated that the callback
1754                // function returns a `i32` result.
1755                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1756
1757                instance.id().get_mut(store).handle_callback_code(
1758                    guest_task,
1759                    callee_instance,
1760                    code,
1761                    true,
1762                )?;
1763
1764                Ok(())
1765            })
1766                as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
1767        } else {
1768            let token = StoreToken::new(store.as_context_mut());
1769            Box::new(move |store: &mut dyn VMStore, instance: Instance| {
1770                let old_task = instance
1771                    .concurrent_state_mut(store)
1772                    .guest_task
1773                    .replace(guest_task);
1774                log::trace!(
1775                    "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1776                );
1777
1778                let mut flags = instance.id().get(store).instance_flags(callee_instance);
1779
1780                instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
1781
1782                // Unless this is a callback-less (i.e. stackful)
1783                // async-lifted export, we need to record that the instance
1784                // cannot be entered until the call returns.
1785                if !async_ {
1786                    instance
1787                        .concurrent_state_mut(store)
1788                        .enter_instance(callee_instance);
1789                }
1790
1791                // SAFETY: See the documentation for `make_call` to review the
1792                // contract we must uphold for `call` here.
1793                //
1794                // Per the contract described in the `queue_call`
1795                // documentation, the `callee` pointer which `call` closes
1796                // over must be valid.
1797                let storage = call(store, instance)?;
1798
1799                if async_ {
1800                    // This is a callback-less (i.e. stackful) async-lifted
1801                    // export, so there is no post-return function, and
1802                    // either `task.return` or `task.cancel` should have
1803                    // been called.
1804                    if instance
1805                        .concurrent_state_mut(store)
1806                        .get_mut(guest_task)?
1807                        .lift_result
1808                        .is_some()
1809                    {
1810                        return Err(anyhow!(crate::Trap::NoAsyncResult));
1811                    }
1812                } else {
1813                    // This is a sync-lifted export, so now is when we lift the
1814                    // result, optionally call the post-return function, if any,
1815                    // and finally notify any current or future waiters that the
1816                    // subtask has returned.
1817
1818                    let lift = {
1819                        let state = instance.concurrent_state_mut(store);
1820                        state.exit_instance(callee_instance)?;
1821
1822                        assert!(state.get_mut(guest_task)?.result.is_none());
1823
1824                        state.get_mut(guest_task)?.lift_result.take().unwrap()
1825                    };
1826
1827                    // SAFETY: `result_count` represents the number of core Wasm
1828                    // results returned, per `wasmparser`.
1829                    let result = (lift.lift)(store, instance, unsafe {
1830                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1831                            &storage[..result_count],
1832                        )
1833                    })?;
1834
1835                    let post_return_arg = match result_count {
1836                        0 => ValRaw::i32(0),
1837                        // SAFETY: `result_count` represents the number of
1838                        // core Wasm results returned, per `wasmparser`.
1839                        1 => unsafe { storage[0].assume_init() },
1840                        _ => unreachable!(),
1841                    };
1842
1843                    if instance
1844                        .concurrent_state_mut(store)
1845                        .get_mut(guest_task)?
1846                        .call_post_return_automatically()
1847                    {
1848                        unsafe {
1849                            flags.set_may_leave(false);
1850                            flags.set_needs_post_return(false);
1851                        }
1852
1853                        if let Some(func) = post_return {
1854                            let mut store = token.as_context_mut(store);
1855
1856                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1857                            // either `wasmtime-cranelift`-generated fused adapter
1858                            // code or `component::Options`.  Per `wasmparser`
1859                            // post-return signature validation, we know it takes a
1860                            // single parameter.
1861                            unsafe {
1862                                crate::Func::call_unchecked_raw(
1863                                    &mut store,
1864                                    func.as_non_null(),
1865                                    slice::from_ref(&post_return_arg).into(),
1866                                )?;
1867                            }
1868                        }
1869
1870                        unsafe {
1871                            flags.set_may_leave(true);
1872                            flags.set_may_enter(true);
1873                        }
1874                    }
1875
1876                    instance.task_complete(
1877                        store,
1878                        guest_task,
1879                        result,
1880                        Status::Returned,
1881                        post_return_arg,
1882                    )?;
1883                }
1884
1885                instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
1886
1887                let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
1888
1889                match &task.caller {
1890                    Caller::Host {
1891                        remove_task_automatically,
1892                        ..
1893                    } => {
1894                        if *remove_task_automatically {
1895                            Waitable::Guest(guest_task)
1896                                .delete_from(instance.concurrent_state_mut(store))?;
1897                        }
1898                    }
1899                    Caller::Guest { .. } => {
1900                        task.exited = true;
1901                    }
1902                }
1903
1904                Ok(())
1905            })
1906        };
1907
1908        self.concurrent_state_mut(store.0)
1909            .push_high_priority(WorkItem::GuestCall(GuestCall {
1910                task: guest_task,
1911                kind: GuestCallKind::Start(fun),
1912            }));
1913
1914        Ok(())
1915    }
1916
1917    /// Prepare (but do not start) a guest->guest call.
1918    ///
1919    /// This is called from fused adapter code generated in
1920    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
1921    /// are synthesized Wasm functions which move the parameters from the caller
1922    /// to the callee and the result from the callee to the caller,
1923    /// respectively.  The adapter will call `Self::start_call` immediately
1924    /// after calling this function.
1925    ///
1926    /// SAFETY: All the pointer arguments must be valid pointers to guest
1927    /// entities (and with the expected signatures for the function references
1928    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
1929    unsafe fn prepare_call<T: 'static>(
1930        self,
1931        mut store: StoreContextMut<T>,
1932        start: *mut VMFuncRef,
1933        return_: *mut VMFuncRef,
1934        caller_instance: RuntimeComponentInstanceIndex,
1935        callee_instance: RuntimeComponentInstanceIndex,
1936        task_return_type: TypeTupleIndex,
1937        memory: *mut VMMemoryDefinition,
1938        string_encoding: u8,
1939        caller_info: CallerInfo,
1940    ) -> Result<()> {
1941        self.id().get(store.0).check_may_leave(caller_instance)?;
1942
1943        enum ResultInfo {
1944            Heap { results: u32 },
1945            Stack { result_count: u32 },
1946        }
1947
1948        let result_info = match &caller_info {
1949            CallerInfo::Async {
1950                has_result: true,
1951                params,
1952            } => ResultInfo::Heap {
1953                results: params.last().unwrap().get_u32(),
1954            },
1955            CallerInfo::Async {
1956                has_result: false, ..
1957            } => ResultInfo::Stack { result_count: 0 },
1958            CallerInfo::Sync {
1959                result_count,
1960                params,
1961            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
1962                results: params.last().unwrap().get_u32(),
1963            },
1964            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
1965                result_count: *result_count,
1966            },
1967        };
1968
1969        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
1970
1971        // Create a new guest task for the call, closing over the `start` and
1972        // `return_` functions to lift the parameters and lower the result,
1973        // respectively.
1974        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
1975        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
1976        let token = StoreToken::new(store.as_context_mut());
1977        let state = self.concurrent_state_mut(store.0);
1978        let old_task = state.guest_task.take();
1979        let new_task = GuestTask::new(
1980            state,
1981            Box::new(move |store, instance, dst| {
1982                let mut store = token.as_context_mut(store);
1983                assert!(dst.len() <= MAX_FLAT_PARAMS);
1984                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1985                let count = match caller_info {
1986                    // Async callers, if they have a result, use the last
1987                    // parameter as a return pointer so chop that off if
1988                    // relevant here.
1989                    CallerInfo::Async { params, has_result } => {
1990                        let params = &params[..params.len() - usize::from(has_result)];
1991                        for (param, src) in params.iter().zip(&mut src) {
1992                            src.write(*param);
1993                        }
1994                        params.len()
1995                    }
1996
1997                    // Sync callers forward everything directly.
1998                    CallerInfo::Sync { params, .. } => {
1999                        for (param, src) in params.iter().zip(&mut src) {
2000                            src.write(*param);
2001                        }
2002                        params.len()
2003                    }
2004                };
2005                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2006                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2007                // how it was constructed (see
2008                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2009                // for details) we know it takes count parameters and returns
2010                // `dst.len()` results.
2011                unsafe {
2012                    crate::Func::call_unchecked_raw(
2013                        &mut store,
2014                        start.as_non_null(),
2015                        NonNull::new(
2016                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2017                        )
2018                        .unwrap(),
2019                    )?;
2020                }
2021                dst.copy_from_slice(&src[..dst.len()]);
2022                let state = instance.concurrent_state_mut(store.0);
2023                let task = state.guest_task.unwrap();
2024                Waitable::Guest(task).set_event(
2025                    state,
2026                    Some(Event::Subtask {
2027                        status: Status::Started,
2028                    }),
2029                )?;
2030                Ok(())
2031            }),
2032            LiftResult {
2033                lift: Box::new(move |store, instance, src| {
2034                    // SAFETY: See comment in closure passed as `lower_params`
2035                    // parameter above.
2036                    let mut store = token.as_context_mut(store);
2037                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2038                    if let ResultInfo::Heap { results } = &result_info {
2039                        my_src.push(ValRaw::u32(*results));
2040                    }
2041                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2042                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2043                    // on how it was constructed (see
2044                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2045                    // for details) we know it takes `src.len()` parameters and
2046                    // returns up to 1 result.
2047                    unsafe {
2048                        crate::Func::call_unchecked_raw(
2049                            &mut store,
2050                            return_.as_non_null(),
2051                            my_src.as_mut_slice().into(),
2052                        )?;
2053                    }
2054                    let state = instance.concurrent_state_mut(store.0);
2055                    let task = state.guest_task.unwrap();
2056                    if sync_caller {
2057                        state.get_mut(task)?.sync_result =
2058                            Some(if let ResultInfo::Stack { result_count } = &result_info {
2059                                match result_count {
2060                                    0 => None,
2061                                    1 => Some(my_src[0]),
2062                                    _ => unreachable!(),
2063                                }
2064                            } else {
2065                                None
2066                            });
2067                    }
2068                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2069                }),
2070                ty: task_return_type,
2071                memory: NonNull::new(memory).map(SendSyncPtr::new),
2072                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2073            },
2074            Caller::Guest {
2075                task: old_task.unwrap(),
2076                instance: caller_instance,
2077            },
2078            None,
2079            callee_instance,
2080        )?;
2081
2082        let guest_task = state.push(new_task)?;
2083
2084        if let Some(old_task) = old_task {
2085            if !state.may_enter(guest_task) {
2086                bail!(crate::Trap::CannotEnterComponent);
2087            }
2088
2089            state.get_mut(old_task)?.subtasks.insert(guest_task);
2090        };
2091
2092        // Make the new task the current one so that `Self::start_call` knows
2093        // which one to start.
2094        state.guest_task = Some(guest_task);
2095        log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2096
2097        Ok(())
2098    }
2099
2100    /// Call the specified callback function for an async-lifted export.
2101    ///
2102    /// SAFETY: `function` must be a valid reference to a guest function of the
2103    /// correct signature for a callback.
2104    unsafe fn call_callback<T>(
2105        self,
2106        mut store: StoreContextMut<T>,
2107        callee_instance: RuntimeComponentInstanceIndex,
2108        function: SendSyncPtr<VMFuncRef>,
2109        event: Event,
2110        handle: u32,
2111        may_enter_after_call: bool,
2112    ) -> Result<u32> {
2113        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2114
2115        let (ordinal, result) = event.parts();
2116        let params = &mut [
2117            ValRaw::u32(ordinal),
2118            ValRaw::u32(handle),
2119            ValRaw::u32(result),
2120        ];
2121        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2122        // `wasmtime-cranelift`-generated fused adapter code or
2123        // `component::Options`.  Per `wasmparser` callback signature
2124        // validation, we know it takes three parameters and returns one.
2125        unsafe {
2126            flags.set_may_enter(false);
2127            crate::Func::call_unchecked_raw(
2128                &mut store,
2129                function.as_non_null(),
2130                params.as_mut_slice().into(),
2131            )?;
2132            flags.set_may_enter(may_enter_after_call);
2133        }
2134        Ok(params[0].get_u32())
2135    }
2136
2137    /// Start a guest->guest call previously prepared using
2138    /// `Self::prepare_call`.
2139    ///
2140    /// This is called from fused adapter code generated in
2141    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2142    /// this function immediately after calling `Self::prepare_call`.
2143    ///
2144    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2145    /// functions with the appropriate signatures for the current guest task.
2146    /// If this is a call to an async-lowered import, the actual call may be
2147    /// deferred and run after this function returns, in which case the pointer
2148    /// arguments must also be valid when the call happens.
2149    unsafe fn start_call<T: 'static>(
2150        self,
2151        mut store: StoreContextMut<T>,
2152        callback: *mut VMFuncRef,
2153        post_return: *mut VMFuncRef,
2154        callee: *mut VMFuncRef,
2155        param_count: u32,
2156        result_count: u32,
2157        flags: u32,
2158        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2159    ) -> Result<u32> {
2160        let token = StoreToken::new(store.as_context_mut());
2161        let async_caller = storage.is_none();
2162        let state = self.concurrent_state_mut(store.0);
2163        let guest_task = state.guest_task.unwrap();
2164        let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically();
2165        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2166        let param_count = usize::try_from(param_count).unwrap();
2167        assert!(param_count <= MAX_FLAT_PARAMS);
2168        let result_count = usize::try_from(result_count).unwrap();
2169        assert!(result_count <= MAX_FLAT_RESULTS);
2170
2171        let task = state.get_mut(guest_task)?;
2172        if !callback.is_null() {
2173            // We're calling an async-lifted export with a callback, so store
2174            // the callback and related context as part of the task so we can
2175            // call it later when needed.
2176            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2177            task.callback = Some(Box::new(
2178                move |store, instance, runtime_instance, event, handle| {
2179                    let store = token.as_context_mut(store);
2180                    unsafe {
2181                        instance.call_callback::<T>(
2182                            store,
2183                            runtime_instance,
2184                            callback,
2185                            event,
2186                            handle,
2187                            may_enter_after_call,
2188                        )
2189                    }
2190                },
2191            ));
2192        }
2193
2194        let Caller::Guest {
2195            task: caller,
2196            instance: runtime_instance,
2197        } = &task.caller
2198        else {
2199            // As of this writing, `start_call` is only used for guest->guest
2200            // calls.
2201            unreachable!()
2202        };
2203        let caller = *caller;
2204        let caller_instance = *runtime_instance;
2205
2206        let callee_instance = task.instance;
2207
2208        let instance_flags = if callback.is_null() {
2209            None
2210        } else {
2211            Some(self.id().get(store.0).instance_flags(callee_instance))
2212        };
2213
2214        // Queue the call as a "high priority" work item.
2215        unsafe {
2216            self.queue_call(
2217                store.as_context_mut(),
2218                guest_task,
2219                callee,
2220                param_count,
2221                result_count,
2222                instance_flags,
2223                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2224                NonNull::new(callback).map(SendSyncPtr::new),
2225                NonNull::new(post_return).map(SendSyncPtr::new),
2226            )?;
2227        }
2228
2229        let state = self.concurrent_state_mut(store.0);
2230
2231        // Use the caller's `GuestTask::sync_call_set` to register interest in
2232        // the subtask...
2233        let guest_waitable = Waitable::Guest(guest_task);
2234        let old_set = guest_waitable.common(state)?.set;
2235        let set = state.get_mut(caller)?.sync_call_set;
2236        guest_waitable.join(state, Some(set))?;
2237
2238        // ... and suspend this fiber temporarily while we wait for it to start.
2239        //
2240        // Note that we _could_ call the callee directly using the current fiber
2241        // rather than suspend this one, but that would make reasoning about the
2242        // event loop more complicated and is probably only worth doing if
2243        // there's a measurable performance benefit.  In addition, it would mean
2244        // blocking the caller if the callee calls a blocking sync-lowered
2245        // import, and as of this writing the spec says we must not do that.
2246        //
2247        // Alternatively, the fused adapter code could be modified to call the
2248        // callee directly without calling a host-provided intrinsic at all (in
2249        // which case it would need to do its own, inline backpressure checks,
2250        // etc.).  Again, we'd want to see a measurable performance benefit
2251        // before committing to such an optimization.  And again, we'd need to
2252        // update the spec to allow that.
2253        let (status, waitable) = loop {
2254            self.suspend(store.0, SuspendReason::Waiting { set, task: caller })?;
2255
2256            let state = self.concurrent_state_mut(store.0);
2257
2258            let event = guest_waitable.take_event(state)?;
2259            let Some(Event::Subtask { status }) = event else {
2260                unreachable!();
2261            };
2262
2263            log::trace!("status {status:?} for {guest_task:?}");
2264
2265            if status == Status::Returned {
2266                // It returned, so we can stop waiting.
2267                break (status, None);
2268            } else if async_caller {
2269                // It hasn't returned yet, but the caller is calling via an
2270                // async-lowered import, so we generate a handle for the task
2271                // waitable and return the status.
2272                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2273                    .subtask_insert_guest(guest_task.rep())?;
2274                self.concurrent_state_mut(store.0)
2275                    .get_mut(guest_task)?
2276                    .common
2277                    .handle = Some(handle);
2278                break (status, Some(handle));
2279            } else {
2280                // The callee hasn't returned yet, and the caller is calling via
2281                // a sync-lowered import, so we loop and keep waiting until the
2282                // callee returns.
2283            }
2284        };
2285
2286        let state = self.concurrent_state_mut(store.0);
2287
2288        guest_waitable.join(state, old_set)?;
2289
2290        if let Some(storage) = storage {
2291            // The caller used a sync-lowered import to call an async-lifted
2292            // export, in which case the result, if any, has been stashed in
2293            // `GuestTask::sync_result`.
2294            let task = state.get_mut(guest_task)?;
2295            if let Some(result) = task.sync_result.take() {
2296                if let Some(result) = result {
2297                    storage[0] = MaybeUninit::new(result);
2298                }
2299
2300                if task.exited {
2301                    Waitable::Guest(guest_task).delete_from(state)?;
2302                }
2303            } else {
2304                // This means the callee failed to call either `task.return` or
2305                // `task.cancel` before exiting.
2306                return Err(anyhow!(crate::Trap::NoAsyncResult));
2307            }
2308        }
2309
2310        // Reset the current task to point to the caller as it resumes control.
2311        state.guest_task = Some(caller);
2312        log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2313
2314        Ok(status.pack(waitable))
2315    }
2316
2317    /// Wrap the specified host function in a future which will call it, passing
2318    /// it an `&Accessor<T>`.
2319    ///
2320    /// See the `Accessor` documentation for details.
2321    pub(crate) fn wrap_call<T, F, R>(
2322        self,
2323        store: StoreContextMut<T>,
2324        closure: F,
2325    ) -> impl Future<Output = Result<R>> + 'static
2326    where
2327        T: 'static,
2328        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
2329            + Send
2330            + Sync
2331            + 'static,
2332        R: Send + Sync + 'static,
2333    {
2334        let token = StoreToken::new(store);
2335        async move {
2336            let mut accessor = Accessor::new(token, Some(self));
2337            closure(&mut accessor).await
2338        }
2339    }
2340
2341    /// Poll the specified future once on behalf of a guest->host call using an
2342    /// async-lowered import.
2343    ///
2344    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2345    /// `Pending`, add it to the set of futures to be polled as part of this
2346    /// instance's event loop until it completes, and then return
2347    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2348    ///
2349    /// Whether the future returns `Ready` immediately or later, the `lower`
2350    /// function will be used to lower the result, if any, into the guest caller's
2351    /// stack and linear memory unless the task has been cancelled.
2352    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2353        self,
2354        mut store: StoreContextMut<T>,
2355        future: impl Future<Output = Result<R>> + Send + 'static,
2356        caller_instance: RuntimeComponentInstanceIndex,
2357        lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
2358    ) -> Result<Option<u32>> {
2359        let token = StoreToken::new(store.as_context_mut());
2360        let state = self.concurrent_state_mut(store.0);
2361        let caller = state.guest_task.unwrap();
2362
2363        // Create an abortable future which hooks calls to poll and manages call
2364        // context state for the future.
2365        let (join_handle, future) = JoinHandle::run(async move {
2366            let mut future = pin!(future);
2367            let mut call_context = None;
2368            future::poll_fn(move |cx| {
2369                // Push the call context for managing any resource borrows
2370                // for the task.
2371                tls::get(|store| {
2372                    if let Some(call_context) = call_context.take() {
2373                        token
2374                            .as_context_mut(store)
2375                            .0
2376                            .component_resource_state()
2377                            .0
2378                            .push(call_context);
2379                    }
2380                });
2381
2382                let result = future.as_mut().poll(cx);
2383
2384                if result.is_pending() {
2385                    // Pop the call context for managing any resource
2386                    // borrows for the task.
2387                    tls::get(|store| {
2388                        call_context = Some(
2389                            token
2390                                .as_context_mut(store)
2391                                .0
2392                                .component_resource_state()
2393                                .0
2394                                .pop()
2395                                .unwrap(),
2396                        );
2397                    });
2398                }
2399                result
2400            })
2401            .await
2402        });
2403
2404        // We create a new host task even though it might complete immediately
2405        // (in which case we won't need to pass a waitable back to the guest).
2406        // If it does complete immediately, we'll remove it before we return.
2407        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2408
2409        log::trace!("new host task child of {caller:?}: {task:?}");
2410
2411        let mut future = Box::pin(future);
2412
2413        // Finally, poll the future.  We can use a dummy `Waker` here because
2414        // we'll add the future to `ConcurrentState::futures` and poll it
2415        // automatically from the event loop if it doesn't complete immediately
2416        // here.
2417        let poll = self.set_tls(store.0, || {
2418            future
2419                .as_mut()
2420                .poll(&mut Context::from_waker(&Waker::noop()))
2421        });
2422
2423        Ok(match poll {
2424            Poll::Ready(None) => unreachable!(),
2425            Poll::Ready(Some(result)) => {
2426                // It finished immediately; lower the result and delete the
2427                // task.
2428                lower(store.as_context_mut(), self, result?)?;
2429                log::trace!("delete host task {task:?} (already ready)");
2430                self.concurrent_state_mut(store.0).delete(task)?;
2431                None
2432            }
2433            Poll::Pending => {
2434                // It hasn't finished yet; add the future to
2435                // `ConcurrentState::futures` so it will be polled by the event
2436                // loop and allocate a waitable handle to return to the guest.
2437
2438                // Wrap the future in a closure responsible for lowering the result into
2439                // the guest's stack and memory, as well as notifying any waiters that
2440                // the task returned.
2441                let future = Box::pin(async move {
2442                    let result = match future.await {
2443                        Some(result) => result?,
2444                        // Task was cancelled; nothing left to do.
2445                        None => return Ok(()),
2446                    };
2447                    tls::get(move |store| {
2448                        // Here we schedule a task to run on a worker fiber to do
2449                        // the lowering since it may involve a call to the guest's
2450                        // realloc function.  This is necessary because calling the
2451                        // guest while there are host embedder frames on the stack
2452                        // is unsound.
2453                        self.concurrent_state_mut(store).push_high_priority(
2454                            WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store, _| {
2455                                lower(token.as_context_mut(store), self, result)?;
2456                                let state = self.concurrent_state_mut(store);
2457                                state.get_mut(task)?.join_handle.take();
2458                                Waitable::Host(task).set_event(
2459                                    state,
2460                                    Some(Event::Subtask {
2461                                        status: Status::Returned,
2462                                    }),
2463                                )
2464                            }))),
2465                        );
2466                        Ok(())
2467                    })
2468                });
2469
2470                self.concurrent_state_mut(store.0).push_future(future);
2471                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2472                    .subtask_insert_host(task.rep())?;
2473                self.concurrent_state_mut(store.0)
2474                    .get_mut(task)?
2475                    .common
2476                    .handle = Some(handle);
2477                log::trace!(
2478                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2479                );
2480                Some(handle)
2481            }
2482        })
2483    }
2484
2485    /// Poll the specified future until it completes on behalf of a guest->host
2486    /// call using a sync-lowered import.
2487    ///
2488    /// This is similar to `Self::first_poll` except it's for sync-lowered
2489    /// imports, meaning we don't need to handle cancellation and we can block
2490    /// the caller until the task completes, at which point the caller can
2491    /// handle lowering the result to the guest's stack and linear memory.
2492    pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
2493        self,
2494        store: &mut dyn VMStore,
2495        future: impl Future<Output = Result<R>> + Send + 'static,
2496        caller_instance: RuntimeComponentInstanceIndex,
2497    ) -> Result<R> {
2498        let state = self.concurrent_state_mut(store);
2499
2500        // If there is no current guest task set, that means the host function
2501        // was registered using e.g. `LinkerInstance::func_wrap`, in which case
2502        // it should complete immediately.
2503        let Some(caller) = state.guest_task else {
2504            return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
2505                Poll::Ready(result) => result,
2506                Poll::Pending => {
2507                    unreachable!()
2508                }
2509            };
2510        };
2511
2512        // Save any existing result stashed in `GuestTask::result` so we can
2513        // replace it with the new result.
2514        let old_result = state
2515            .get_mut(caller)
2516            .with_context(|| format!("bad handle: {caller:?}"))?
2517            .result
2518            .take();
2519
2520        // Add a temporary host task into the table so we can track its
2521        // progress.  Note that we'll never allocate a waitable handle for the
2522        // guest since we're being called synchronously.
2523        let task = state.push(HostTask::new(caller_instance, None))?;
2524
2525        log::trace!("new host task child of {caller:?}: {task:?}");
2526
2527        // Wrap the future in a closure which will take care of stashing the
2528        // result in `GuestTask::result` and resuming this fiber when the host
2529        // task completes.
2530        let mut future = Box::pin(async move {
2531            let result = future.await?;
2532            tls::get(move |store| {
2533                let state = self.concurrent_state_mut(store);
2534                state.get_mut(caller)?.result = Some(Box::new(result) as _);
2535
2536                Waitable::Host(task).set_event(
2537                    state,
2538                    Some(Event::Subtask {
2539                        status: Status::Returned,
2540                    }),
2541                )?;
2542
2543                Ok(())
2544            })
2545        }) as HostTaskFuture;
2546
2547        // Finally, poll the future.  We can use a dummy `Waker` here because
2548        // we'll add the future to `ConcurrentState::futures` and poll it
2549        // automatically from the event loop if it doesn't complete immediately
2550        // here.
2551        let poll = self.set_tls(store, || {
2552            future
2553                .as_mut()
2554                .poll(&mut Context::from_waker(&Waker::noop()))
2555        });
2556
2557        match poll {
2558            Poll::Ready(result) => {
2559                // It completed immediately; check the result and delete the task.
2560                result?;
2561                log::trace!("delete host task {task:?} (already ready)");
2562                self.concurrent_state_mut(store).delete(task)?;
2563            }
2564            Poll::Pending => {
2565                // It did not complete immediately; add it to
2566                // `ConcurrentState::futures` so it will be polled via the event
2567                // loop; then use `GuestTask::sync_call_set` to wait for the
2568                // task to complete, suspending the current fiber until it does
2569                // so.
2570                let state = self.concurrent_state_mut(store);
2571                state.push_future(future);
2572
2573                let set = state.get_mut(caller)?.sync_call_set;
2574                Waitable::Host(task).join(state, Some(set))?;
2575
2576                self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
2577            }
2578        }
2579
2580        // Retrieve and return the result.
2581        Ok(*mem::replace(
2582            &mut self.concurrent_state_mut(store).get_mut(caller)?.result,
2583            old_result,
2584        )
2585        .unwrap()
2586        .downcast()
2587        .unwrap())
2588    }
2589
2590    /// Implements the `task.return` intrinsic, lifting the result for the
2591    /// current guest task.
2592    pub(crate) fn task_return(
2593        self,
2594        store: &mut dyn VMStore,
2595        caller: RuntimeComponentInstanceIndex,
2596        ty: TypeTupleIndex,
2597        options: OptionsIndex,
2598        storage: &[ValRaw],
2599    ) -> Result<()> {
2600        self.id().get(store).check_may_leave(caller)?;
2601        let state = self.concurrent_state_mut(store);
2602        let CanonicalOptions {
2603            string_encoding,
2604            data_model,
2605            ..
2606        } = *state.options(options);
2607        let guest_task = state.guest_task.unwrap();
2608        let lift = state
2609            .get_mut(guest_task)?
2610            .lift_result
2611            .take()
2612            .ok_or_else(|| {
2613                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2614            })?;
2615        assert!(state.get_mut(guest_task)?.result.is_none());
2616
2617        let invalid = ty != lift.ty
2618            || string_encoding != lift.string_encoding
2619            || match data_model {
2620                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2621                    Some(memory) => {
2622                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2623                        let actual = self.id().get(store).runtime_memory(memory);
2624                        expected != actual
2625                    }
2626                    // Memory not specified, meaning it didn't need to be
2627                    // specified per validation, so not invalid.
2628                    None => false,
2629                },
2630                // Always invalid as this isn't supported.
2631                CanonicalOptionsDataModel::Gc { .. } => true,
2632            };
2633
2634        if invalid {
2635            bail!("invalid `task.return` signature and/or options for current task");
2636        }
2637
2638        log::trace!("task.return for {guest_task:?}");
2639
2640        let result = (lift.lift)(store, self, storage)?;
2641
2642        self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2643    }
2644
2645    /// Implements the `task.cancel` intrinsic.
2646    pub(crate) fn task_cancel(
2647        self,
2648        store: &mut dyn VMStore,
2649        caller: RuntimeComponentInstanceIndex,
2650    ) -> Result<()> {
2651        self.id().get(store).check_may_leave(caller)?;
2652        let state = self.concurrent_state_mut(store);
2653        let guest_task = state.guest_task.unwrap();
2654        let task = state.get_mut(guest_task)?;
2655        if !task.cancel_sent {
2656            bail!("`task.cancel` called by task which has not been cancelled")
2657        }
2658        _ = task.lift_result.take().ok_or_else(|| {
2659            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2660        })?;
2661
2662        assert!(task.result.is_none());
2663
2664        log::trace!("task.cancel for {guest_task:?}");
2665
2666        self.task_complete(
2667            store,
2668            guest_task,
2669            Box::new(DummyResult),
2670            Status::ReturnCancelled,
2671            ValRaw::i32(0),
2672        )
2673    }
2674
2675    /// Complete the specified guest task (i.e. indicate that it has either
2676    /// returned a (possibly empty) result or cancelled itself).
2677    ///
2678    /// This will return any resource borrows and notify any current or future
2679    /// waiters that the task has completed.
2680    fn task_complete(
2681        self,
2682        store: &mut dyn VMStore,
2683        guest_task: TableId<GuestTask>,
2684        result: Box<dyn Any + Send + Sync>,
2685        status: Status,
2686        post_return_arg: ValRaw,
2687    ) -> Result<()> {
2688        if self
2689            .concurrent_state_mut(store)
2690            .get_mut(guest_task)?
2691            .call_post_return_automatically()
2692        {
2693            let (calls, host_table, _, instance) = store
2694                .store_opaque_mut()
2695                .component_resource_state_with_instance(self);
2696            ResourceTables {
2697                calls,
2698                host_table: Some(host_table),
2699                guest: Some(instance.guest_tables()),
2700            }
2701            .exit_call()?;
2702        } else {
2703            // As of this writing, the only scenario where `call_post_return_automatically`
2704            // would be false for a `GuestTask` is for host-to-guest calls using
2705            // `[Typed]Func::call_async`, in which case the `function_index`
2706            // should be a non-`None` value.
2707            let function_index = self
2708                .concurrent_state_mut(store)
2709                .get_mut(guest_task)?
2710                .function_index
2711                .unwrap();
2712
2713            self.id()
2714                .get_mut(store)
2715                .post_return_arg_set(function_index, post_return_arg);
2716        }
2717
2718        let state = self.concurrent_state_mut(store);
2719        let task = state.get_mut(guest_task)?;
2720
2721        if let Caller::Host { tx, .. } = &mut task.caller {
2722            if let Some(tx) = tx.take() {
2723                _ = tx.send(result);
2724            }
2725        } else {
2726            task.result = Some(result);
2727            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2728        }
2729
2730        Ok(())
2731    }
2732
2733    /// Implements the `waitable-set.wait` intrinsic.
2734    pub(crate) fn waitable_set_wait(
2735        self,
2736        store: &mut dyn VMStore,
2737        caller: RuntimeComponentInstanceIndex,
2738        options: OptionsIndex,
2739        set: u32,
2740        payload: u32,
2741    ) -> Result<u32> {
2742        self.id().get(store).check_may_leave(caller)?;
2743        let opts = self.concurrent_state_mut(store).options(options);
2744        let cancellable = opts.cancellable;
2745        let caller_instance = opts.instance;
2746        let rep =
2747            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2748
2749        self.waitable_check(
2750            store,
2751            cancellable,
2752            WaitableCheck::Wait(WaitableCheckParams {
2753                set: TableId::new(rep),
2754                options,
2755                payload,
2756            }),
2757        )
2758    }
2759
2760    /// Implements the `waitable-set.poll` intrinsic.
2761    pub(crate) fn waitable_set_poll(
2762        self,
2763        store: &mut dyn VMStore,
2764        caller: RuntimeComponentInstanceIndex,
2765        options: OptionsIndex,
2766        set: u32,
2767        payload: u32,
2768    ) -> Result<u32> {
2769        self.id().get(store).check_may_leave(caller)?;
2770        let opts = self.concurrent_state_mut(store).options(options);
2771        let cancellable = opts.cancellable;
2772        let caller_instance = opts.instance;
2773        let rep =
2774            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2775
2776        self.waitable_check(
2777            store,
2778            cancellable,
2779            WaitableCheck::Poll(WaitableCheckParams {
2780                set: TableId::new(rep),
2781                options,
2782                payload,
2783            }),
2784        )
2785    }
2786
2787    /// Implements the `thread.yield` intrinsic.
2788    pub(crate) fn thread_yield(
2789        self,
2790        store: &mut dyn VMStore,
2791        caller: RuntimeComponentInstanceIndex,
2792        cancellable: bool,
2793    ) -> Result<bool> {
2794        self.id().get(store).check_may_leave(caller)?;
2795        self.waitable_check(store, cancellable, WaitableCheck::Yield)
2796            .map(|_| {
2797                if cancellable {
2798                    let state = self.concurrent_state_mut(store);
2799                    let task = state.guest_task.unwrap();
2800                    if let Some(event) = state.get_mut(task).unwrap().event.take() {
2801                        assert!(matches!(event, Event::Cancelled));
2802                        true
2803                    } else {
2804                        false
2805                    }
2806                } else {
2807                    false
2808                }
2809            })
2810    }
2811
2812    /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and
2813    /// `yield` intrinsics.
2814    fn waitable_check(
2815        self,
2816        store: &mut dyn VMStore,
2817        cancellable: bool,
2818        check: WaitableCheck,
2819    ) -> Result<u32> {
2820        let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
2821
2822        let (wait, set) = match &check {
2823            WaitableCheck::Wait(params) => (true, Some(params.set)),
2824            WaitableCheck::Poll(params) => (false, Some(params.set)),
2825            WaitableCheck::Yield => (false, None),
2826        };
2827
2828        // First, suspend this fiber, allowing any other tasks to run.
2829        self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
2830
2831        log::trace!("waitable check for {guest_task:?}; set {set:?}");
2832
2833        let state = self.concurrent_state_mut(store);
2834        let task = state.get_mut(guest_task)?;
2835
2836        if wait && task.callback.is_some() {
2837            bail!("cannot call `task.wait` from async-lifted export with callback");
2838        }
2839
2840        // If we're waiting, and there are no events immediately available,
2841        // suspend the fiber until that changes.
2842        if wait {
2843            let set = set.unwrap();
2844
2845            if (task.event.is_none()
2846                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
2847                && state.get_mut(set)?.ready.is_empty()
2848            {
2849                if cancellable {
2850                    let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
2851                    assert!(old.is_none());
2852                }
2853
2854                self.suspend(
2855                    store,
2856                    SuspendReason::Waiting {
2857                        set,
2858                        task: guest_task,
2859                    },
2860                )?;
2861            }
2862        }
2863
2864        log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
2865
2866        let result = match check {
2867            // Deliver any pending events to the guest and return.
2868            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
2869                let event = self.id().get_mut(store).get_event(
2870                    guest_task,
2871                    Some(params.set),
2872                    cancellable,
2873                )?;
2874
2875                let (ordinal, handle, result) = if wait {
2876                    let (event, waitable) = event.unwrap();
2877                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2878                    let (ordinal, result) = event.parts();
2879                    (ordinal, handle, result)
2880                } else {
2881                    if let Some((event, waitable)) = event {
2882                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2883                        let (ordinal, result) = event.parts();
2884                        (ordinal, handle, result)
2885                    } else {
2886                        log::trace!(
2887                            "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
2888                            params.set
2889                        );
2890                        let (ordinal, result) = Event::None.parts();
2891                        (ordinal, 0, result)
2892                    }
2893                };
2894                let store = store.store_opaque_mut();
2895                let options = Options::new_index(store, self, params.options);
2896                let ptr = func::validate_inbounds::<(u32, u32)>(
2897                    options.memory_mut(store),
2898                    &ValRaw::u32(params.payload),
2899                )?;
2900                options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
2901                options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
2902                Ok(ordinal)
2903            }
2904            WaitableCheck::Yield => Ok(0),
2905        };
2906
2907        result
2908    }
2909
2910    /// Implements the `subtask.cancel` intrinsic.
2911    pub(crate) fn subtask_cancel(
2912        self,
2913        store: &mut dyn VMStore,
2914        caller_instance: RuntimeComponentInstanceIndex,
2915        async_: bool,
2916        task_id: u32,
2917    ) -> Result<u32> {
2918        self.id().get(store).check_may_leave(caller_instance)?;
2919        let (rep, is_host) =
2920            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
2921        let (waitable, expected_caller_instance) = if is_host {
2922            let id = TableId::<HostTask>::new(rep);
2923            (
2924                Waitable::Host(id),
2925                self.concurrent_state_mut(store)
2926                    .get_mut(id)?
2927                    .caller_instance,
2928            )
2929        } else {
2930            let id = TableId::<GuestTask>::new(rep);
2931            if let Caller::Guest { instance, .. } =
2932                &self.concurrent_state_mut(store).get_mut(id)?.caller
2933            {
2934                (Waitable::Guest(id), *instance)
2935            } else {
2936                unreachable!()
2937            }
2938        };
2939        // Since waitables can neither be passed between instances nor forged,
2940        // this should never fail unless there's a bug in Wasmtime, but we check
2941        // here to be sure:
2942        assert_eq!(expected_caller_instance, caller_instance);
2943
2944        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
2945
2946        let concurrent_state = self.concurrent_state_mut(store);
2947        if let Waitable::Host(host_task) = waitable {
2948            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
2949                handle.abort();
2950                return Ok(Status::ReturnCancelled as u32);
2951            }
2952        } else {
2953            let caller = concurrent_state.guest_task.unwrap();
2954            let guest_task = TableId::<GuestTask>::new(rep);
2955            let task = concurrent_state.get_mut(guest_task)?;
2956            if task.lower_params.is_some() {
2957                task.lower_params = None;
2958                task.lift_result = None;
2959
2960                // Not yet started; cancel and remove from pending
2961                let callee_instance = task.instance;
2962
2963                let kind = concurrent_state
2964                    .instance_state(callee_instance)
2965                    .pending
2966                    .remove(&guest_task);
2967
2968                if kind.is_none() {
2969                    bail!("`subtask.cancel` called after terminal status delivered");
2970                }
2971
2972                return Ok(Status::StartCancelled as u32);
2973            } else if task.lift_result.is_some() {
2974                // Started, but not yet returned or cancelled; send the
2975                // `CANCELLED` event
2976                task.cancel_sent = true;
2977                // Note that this might overwrite an event that was set earlier
2978                // (e.g. `Event::None` if the task is yielding, or
2979                // `Event::Cancelled` if it was already cancelled), but that's
2980                // okay -- this should supersede the previous state.
2981                task.event = Some(Event::Cancelled);
2982                if let Some(set) = task.wake_on_cancel.take() {
2983                    let item = match concurrent_state
2984                        .get_mut(set)?
2985                        .waiting
2986                        .remove(&guest_task)
2987                        .unwrap()
2988                    {
2989                        WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
2990                        WaitMode::Callback => WorkItem::GuestCall(GuestCall {
2991                            task: guest_task,
2992                            kind: GuestCallKind::DeliverEvent { set: None },
2993                        }),
2994                    };
2995                    concurrent_state.push_high_priority(item);
2996
2997                    self.suspend(store, SuspendReason::Yielding { task: caller })?;
2998                }
2999
3000                let concurrent_state = self.concurrent_state_mut(store);
3001                let task = concurrent_state.get_mut(guest_task)?;
3002                if task.lift_result.is_some() {
3003                    // Still not yet returned or cancelled; if `async_`, return
3004                    // `BLOCKED`; otherwise wait
3005                    if async_ {
3006                        return Ok(BLOCKED);
3007                    } else {
3008                        self.wait_for_event(store, Waitable::Guest(guest_task))?;
3009                    }
3010                }
3011            }
3012        }
3013
3014        let event = waitable.take_event(self.concurrent_state_mut(store))?;
3015        if let Some(Event::Subtask {
3016            status: status @ (Status::Returned | Status::ReturnCancelled),
3017        }) = event
3018        {
3019            Ok(status as u32)
3020        } else {
3021            bail!("`subtask.cancel` called after terminal status delivered");
3022        }
3023    }
3024
3025    fn wait_for_event(self, store: &mut dyn VMStore, waitable: Waitable) -> Result<()> {
3026        let state = self.concurrent_state_mut(store);
3027        let caller = state.guest_task.unwrap();
3028        let old_set = waitable.common(state)?.set;
3029        let set = state.get_mut(caller)?.sync_call_set;
3030        waitable.join(state, Some(set))?;
3031        self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
3032        let state = self.concurrent_state_mut(store);
3033        waitable.join(state, old_set)
3034    }
3035
3036    /// Configures TLS state so `store` will be available via `tls::get` within
3037    /// the closure `f` provided.
3038    ///
3039    /// This is used to ensure that `Future::poll`, which doesn't take a `store`
3040    /// parameter, is able to get access to the `store` during future poll
3041    /// methods.
3042    fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
3043        struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
3044
3045        impl Drop for Reset<'_> {
3046            fn drop(&mut self) {
3047                self.0.concurrent_async_state_mut().current_instance = self.1;
3048            }
3049        }
3050        let prev = mem::replace(
3051            &mut store.concurrent_async_state_mut().current_instance,
3052            Some(self.id().instance()),
3053        );
3054        let reset = Reset(store, prev);
3055
3056        tls::set(reset.0, f)
3057    }
3058
3059    /// Convenience function to reduce boilerplate.
3060    pub(crate) fn concurrent_state_mut<'a>(
3061        &self,
3062        store: &'a mut StoreOpaque,
3063    ) -> &'a mut ConcurrentState {
3064        self.id().get_mut(store).concurrent_state_mut()
3065    }
3066
3067    pub(crate) fn context_get(
3068        self,
3069        store: &mut dyn VMStore,
3070        caller: RuntimeComponentInstanceIndex,
3071        slot: u32,
3072    ) -> Result<u32> {
3073        self.id().get(store).check_may_leave(caller)?;
3074        self.concurrent_state_mut(store).context_get(slot)
3075    }
3076
3077    pub(crate) fn context_set(
3078        self,
3079        store: &mut dyn VMStore,
3080        caller: RuntimeComponentInstanceIndex,
3081        slot: u32,
3082        value: u32,
3083    ) -> Result<()> {
3084        self.id().get(store).check_may_leave(caller)?;
3085        self.concurrent_state_mut(store).context_set(slot, value)
3086    }
3087}
3088
3089/// Trait representing component model ABI async intrinsics and fused adapter
3090/// helper functions.
3091///
3092/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3093/// which must be valid for at least the duration of the call (and possibly for
3094/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3095/// pointers used for async calls).
3096pub trait VMComponentAsyncStore {
3097    /// A helper function for fused adapter modules involving calls where the
3098    /// one of the caller or callee is async.
3099    ///
3100    /// This helper is not used when the caller and callee both use the sync
3101    /// ABI, only when at least one is async is this used.
3102    unsafe fn prepare_call(
3103        &mut self,
3104        instance: Instance,
3105        memory: *mut VMMemoryDefinition,
3106        start: *mut VMFuncRef,
3107        return_: *mut VMFuncRef,
3108        caller_instance: RuntimeComponentInstanceIndex,
3109        callee_instance: RuntimeComponentInstanceIndex,
3110        task_return_type: TypeTupleIndex,
3111        string_encoding: u8,
3112        result_count: u32,
3113        storage: *mut ValRaw,
3114        storage_len: usize,
3115    ) -> Result<()>;
3116
3117    /// A helper function for fused adapter modules involving calls where the
3118    /// caller is sync-lowered but the callee is async-lifted.
3119    unsafe fn sync_start(
3120        &mut self,
3121        instance: Instance,
3122        callback: *mut VMFuncRef,
3123        callee: *mut VMFuncRef,
3124        param_count: u32,
3125        storage: *mut MaybeUninit<ValRaw>,
3126        storage_len: usize,
3127    ) -> Result<()>;
3128
3129    /// A helper function for fused adapter modules involving calls where the
3130    /// caller is async-lowered.
3131    unsafe fn async_start(
3132        &mut self,
3133        instance: Instance,
3134        callback: *mut VMFuncRef,
3135        post_return: *mut VMFuncRef,
3136        callee: *mut VMFuncRef,
3137        param_count: u32,
3138        result_count: u32,
3139        flags: u32,
3140    ) -> Result<u32>;
3141
3142    /// The `future.write` intrinsic.
3143    fn future_write(
3144        &mut self,
3145        instance: Instance,
3146        caller: RuntimeComponentInstanceIndex,
3147        ty: TypeFutureTableIndex,
3148        options: OptionsIndex,
3149        future: u32,
3150        address: u32,
3151    ) -> Result<u32>;
3152
3153    /// The `future.read` intrinsic.
3154    fn future_read(
3155        &mut self,
3156        instance: Instance,
3157        caller: RuntimeComponentInstanceIndex,
3158        ty: TypeFutureTableIndex,
3159        options: OptionsIndex,
3160        future: u32,
3161        address: u32,
3162    ) -> Result<u32>;
3163
3164    /// The `future.drop-writable` intrinsic.
3165    fn future_drop_writable(
3166        &mut self,
3167        instance: Instance,
3168        caller: RuntimeComponentInstanceIndex,
3169        ty: TypeFutureTableIndex,
3170        writer: u32,
3171    ) -> Result<()>;
3172
3173    /// The `stream.write` intrinsic.
3174    fn stream_write(
3175        &mut self,
3176        instance: Instance,
3177        caller: RuntimeComponentInstanceIndex,
3178        ty: TypeStreamTableIndex,
3179        options: OptionsIndex,
3180        stream: u32,
3181        address: u32,
3182        count: u32,
3183    ) -> Result<u32>;
3184
3185    /// The `stream.read` intrinsic.
3186    fn stream_read(
3187        &mut self,
3188        instance: Instance,
3189        caller: RuntimeComponentInstanceIndex,
3190        ty: TypeStreamTableIndex,
3191        options: OptionsIndex,
3192        stream: u32,
3193        address: u32,
3194        count: u32,
3195    ) -> Result<u32>;
3196
3197    /// The "fast-path" implementation of the `stream.write` intrinsic for
3198    /// "flat" (i.e. memcpy-able) payloads.
3199    fn flat_stream_write(
3200        &mut self,
3201        instance: Instance,
3202        caller: RuntimeComponentInstanceIndex,
3203        ty: TypeStreamTableIndex,
3204        options: OptionsIndex,
3205        payload_size: u32,
3206        payload_align: u32,
3207        stream: u32,
3208        address: u32,
3209        count: u32,
3210    ) -> Result<u32>;
3211
3212    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3213    /// (i.e. memcpy-able) payloads.
3214    fn flat_stream_read(
3215        &mut self,
3216        instance: Instance,
3217        caller: RuntimeComponentInstanceIndex,
3218        ty: TypeStreamTableIndex,
3219        options: OptionsIndex,
3220        payload_size: u32,
3221        payload_align: u32,
3222        stream: u32,
3223        address: u32,
3224        count: u32,
3225    ) -> Result<u32>;
3226
3227    /// The `stream.drop-writable` intrinsic.
3228    fn stream_drop_writable(
3229        &mut self,
3230        instance: Instance,
3231        caller: RuntimeComponentInstanceIndex,
3232        ty: TypeStreamTableIndex,
3233        writer: u32,
3234    ) -> Result<()>;
3235
3236    /// The `error-context.debug-message` intrinsic.
3237    fn error_context_debug_message(
3238        &mut self,
3239        instance: Instance,
3240        caller: RuntimeComponentInstanceIndex,
3241        ty: TypeComponentLocalErrorContextTableIndex,
3242        options: OptionsIndex,
3243        err_ctx_handle: u32,
3244        debug_msg_address: u32,
3245    ) -> Result<()>;
3246}
3247
3248/// SAFETY: See trait docs.
3249impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3250    unsafe fn prepare_call(
3251        &mut self,
3252        instance: Instance,
3253        memory: *mut VMMemoryDefinition,
3254        start: *mut VMFuncRef,
3255        return_: *mut VMFuncRef,
3256        caller_instance: RuntimeComponentInstanceIndex,
3257        callee_instance: RuntimeComponentInstanceIndex,
3258        task_return_type: TypeTupleIndex,
3259        string_encoding: u8,
3260        result_count_or_max_if_async: u32,
3261        storage: *mut ValRaw,
3262        storage_len: usize,
3263    ) -> Result<()> {
3264        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3265        // this method will have ensured that `storage` is a valid
3266        // pointer containing at least `storage_len` items.
3267        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3268
3269        unsafe {
3270            instance.prepare_call(
3271                StoreContextMut(self),
3272                start,
3273                return_,
3274                caller_instance,
3275                callee_instance,
3276                task_return_type,
3277                memory,
3278                string_encoding,
3279                match result_count_or_max_if_async {
3280                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3281                        params,
3282                        has_result: false,
3283                    },
3284                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3285                        params,
3286                        has_result: true,
3287                    },
3288                    result_count => CallerInfo::Sync {
3289                        params,
3290                        result_count,
3291                    },
3292                },
3293            )
3294        }
3295    }
3296
3297    unsafe fn sync_start(
3298        &mut self,
3299        instance: Instance,
3300        callback: *mut VMFuncRef,
3301        callee: *mut VMFuncRef,
3302        param_count: u32,
3303        storage: *mut MaybeUninit<ValRaw>,
3304        storage_len: usize,
3305    ) -> Result<()> {
3306        unsafe {
3307            instance
3308                .start_call(
3309                    StoreContextMut(self),
3310                    callback,
3311                    ptr::null_mut(),
3312                    callee,
3313                    param_count,
3314                    1,
3315                    START_FLAG_ASYNC_CALLEE,
3316                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3317                    // this method will have ensured that `storage` is a valid
3318                    // pointer containing at least `storage_len` items.
3319                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3320                )
3321                .map(drop)
3322        }
3323    }
3324
3325    unsafe fn async_start(
3326        &mut self,
3327        instance: Instance,
3328        callback: *mut VMFuncRef,
3329        post_return: *mut VMFuncRef,
3330        callee: *mut VMFuncRef,
3331        param_count: u32,
3332        result_count: u32,
3333        flags: u32,
3334    ) -> Result<u32> {
3335        unsafe {
3336            instance.start_call(
3337                StoreContextMut(self),
3338                callback,
3339                post_return,
3340                callee,
3341                param_count,
3342                result_count,
3343                flags,
3344                None,
3345            )
3346        }
3347    }
3348
3349    fn future_write(
3350        &mut self,
3351        instance: Instance,
3352        caller: RuntimeComponentInstanceIndex,
3353        ty: TypeFutureTableIndex,
3354        options: OptionsIndex,
3355        future: u32,
3356        address: u32,
3357    ) -> Result<u32> {
3358        instance.id().get(self).check_may_leave(caller)?;
3359        instance
3360            .guest_write(
3361                StoreContextMut(self),
3362                TransmitIndex::Future(ty),
3363                options,
3364                None,
3365                future,
3366                address,
3367                1,
3368            )
3369            .map(|result| result.encode())
3370    }
3371
3372    fn future_read(
3373        &mut self,
3374        instance: Instance,
3375        caller: RuntimeComponentInstanceIndex,
3376        ty: TypeFutureTableIndex,
3377        options: OptionsIndex,
3378        future: u32,
3379        address: u32,
3380    ) -> Result<u32> {
3381        instance.id().get(self).check_may_leave(caller)?;
3382        instance
3383            .guest_read(
3384                StoreContextMut(self),
3385                TransmitIndex::Future(ty),
3386                options,
3387                None,
3388                future,
3389                address,
3390                1,
3391            )
3392            .map(|result| result.encode())
3393    }
3394
3395    fn stream_write(
3396        &mut self,
3397        instance: Instance,
3398        caller: RuntimeComponentInstanceIndex,
3399        ty: TypeStreamTableIndex,
3400        options: OptionsIndex,
3401        stream: u32,
3402        address: u32,
3403        count: u32,
3404    ) -> Result<u32> {
3405        instance.id().get(self).check_may_leave(caller)?;
3406        instance
3407            .guest_write(
3408                StoreContextMut(self),
3409                TransmitIndex::Stream(ty),
3410                options,
3411                None,
3412                stream,
3413                address,
3414                count,
3415            )
3416            .map(|result| result.encode())
3417    }
3418
3419    fn stream_read(
3420        &mut self,
3421        instance: Instance,
3422        caller: RuntimeComponentInstanceIndex,
3423        ty: TypeStreamTableIndex,
3424        options: OptionsIndex,
3425        stream: u32,
3426        address: u32,
3427        count: u32,
3428    ) -> Result<u32> {
3429        instance.id().get(self).check_may_leave(caller)?;
3430        instance
3431            .guest_read(
3432                StoreContextMut(self),
3433                TransmitIndex::Stream(ty),
3434                options,
3435                None,
3436                stream,
3437                address,
3438                count,
3439            )
3440            .map(|result| result.encode())
3441    }
3442
3443    fn future_drop_writable(
3444        &mut self,
3445        instance: Instance,
3446        caller: RuntimeComponentInstanceIndex,
3447        ty: TypeFutureTableIndex,
3448        writer: u32,
3449    ) -> Result<()> {
3450        instance.id().get(self).check_may_leave(caller)?;
3451        instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Future(ty), writer)
3452    }
3453
3454    fn flat_stream_write(
3455        &mut self,
3456        instance: Instance,
3457        caller: RuntimeComponentInstanceIndex,
3458        ty: TypeStreamTableIndex,
3459        options: OptionsIndex,
3460        payload_size: u32,
3461        payload_align: u32,
3462        stream: u32,
3463        address: u32,
3464        count: u32,
3465    ) -> Result<u32> {
3466        instance.id().get(self).check_may_leave(caller)?;
3467        instance
3468            .guest_write(
3469                StoreContextMut(self),
3470                TransmitIndex::Stream(ty),
3471                options,
3472                Some(FlatAbi {
3473                    size: payload_size,
3474                    align: payload_align,
3475                }),
3476                stream,
3477                address,
3478                count,
3479            )
3480            .map(|result| result.encode())
3481    }
3482
3483    fn flat_stream_read(
3484        &mut self,
3485        instance: Instance,
3486        caller: RuntimeComponentInstanceIndex,
3487        ty: TypeStreamTableIndex,
3488        options: OptionsIndex,
3489        payload_size: u32,
3490        payload_align: u32,
3491        stream: u32,
3492        address: u32,
3493        count: u32,
3494    ) -> Result<u32> {
3495        instance.id().get(self).check_may_leave(caller)?;
3496        instance
3497            .guest_read(
3498                StoreContextMut(self),
3499                TransmitIndex::Stream(ty),
3500                options,
3501                Some(FlatAbi {
3502                    size: payload_size,
3503                    align: payload_align,
3504                }),
3505                stream,
3506                address,
3507                count,
3508            )
3509            .map(|result| result.encode())
3510    }
3511
3512    fn stream_drop_writable(
3513        &mut self,
3514        instance: Instance,
3515        caller: RuntimeComponentInstanceIndex,
3516        ty: TypeStreamTableIndex,
3517        writer: u32,
3518    ) -> Result<()> {
3519        instance.id().get(self).check_may_leave(caller)?;
3520        instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Stream(ty), writer)
3521    }
3522
3523    fn error_context_debug_message(
3524        &mut self,
3525        instance: Instance,
3526        caller: RuntimeComponentInstanceIndex,
3527        ty: TypeComponentLocalErrorContextTableIndex,
3528        options: OptionsIndex,
3529        err_ctx_handle: u32,
3530        debug_msg_address: u32,
3531    ) -> Result<()> {
3532        instance.id().get(self).check_may_leave(caller)?;
3533        instance.error_context_debug_message(
3534            StoreContextMut(self),
3535            ty,
3536            options,
3537            err_ctx_handle,
3538            debug_msg_address,
3539        )
3540    }
3541}
3542
3543type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3544
3545/// Represents the state of a pending host task.
3546struct HostTask {
3547    common: WaitableCommon,
3548    caller_instance: RuntimeComponentInstanceIndex,
3549    join_handle: Option<JoinHandle>,
3550}
3551
3552impl HostTask {
3553    fn new(
3554        caller_instance: RuntimeComponentInstanceIndex,
3555        join_handle: Option<JoinHandle>,
3556    ) -> Self {
3557        Self {
3558            common: WaitableCommon::default(),
3559            caller_instance,
3560            join_handle,
3561        }
3562    }
3563}
3564
3565impl TableDebug for HostTask {
3566    fn type_name() -> &'static str {
3567        "HostTask"
3568    }
3569}
3570
3571type CallbackFn = Box<
3572    dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3573        + Send
3574        + Sync
3575        + 'static,
3576>;
3577
3578/// Represents the caller of a given guest task.
3579enum Caller {
3580    /// The host called the guest task.
3581    Host {
3582        /// If present, may be used to deliver the result.
3583        tx: Option<oneshot::Sender<LiftedResult>>,
3584        /// Channel to notify once all subtasks spawned by this caller have
3585        /// completed.
3586        ///
3587        /// Note that we'll never actually send anything to this channel;
3588        /// dropping it when the refcount goes to zero is sufficient to notify
3589        /// the receiver.
3590        exit_tx: Arc<oneshot::Sender<()>>,
3591        /// If true, remove the task from the concurrent state that owns it
3592        /// automatically after it completes.
3593        remove_task_automatically: bool,
3594        /// If true, call `post-return` function (if any) automatically.
3595        call_post_return_automatically: bool,
3596    },
3597    /// Another guest task called the guest task
3598    Guest {
3599        /// The id of the caller
3600        task: TableId<GuestTask>,
3601        /// The instance to use to enforce reentrance rules.
3602        ///
3603        /// Note that this might not be the same as the instance the caller task
3604        /// started executing in given that one or more synchronous guest->guest
3605        /// calls may have occurred involving multiple instances.
3606        instance: RuntimeComponentInstanceIndex,
3607    },
3608}
3609
3610/// Represents a closure and related canonical ABI parameters required to
3611/// validate a `task.return` call at runtime and lift the result.
3612struct LiftResult {
3613    lift: RawLift,
3614    ty: TypeTupleIndex,
3615    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3616    string_encoding: StringEncoding,
3617}
3618
3619/// Represents a pending guest task.
3620struct GuestTask {
3621    /// See `WaitableCommon`
3622    common: WaitableCommon,
3623    /// Closure to lower the parameters passed to this task.
3624    lower_params: Option<RawLower>,
3625    /// See `LiftResult`
3626    lift_result: Option<LiftResult>,
3627    /// A place to stash the type-erased lifted result if it can't be delivered
3628    /// immediately.
3629    result: Option<LiftedResult>,
3630    /// Closure to call the callback function for an async-lifted export, if
3631    /// provided.
3632    callback: Option<CallbackFn>,
3633    /// See `Caller`
3634    caller: Caller,
3635    /// A place to stash the call context for managing resource borrows while
3636    /// switching between guest tasks.
3637    call_context: Option<CallContext>,
3638    /// A place to stash the lowered result for a sync-to-async call until it
3639    /// can be returned to the caller.
3640    sync_result: Option<Option<ValRaw>>,
3641    /// Whether or not the task has been cancelled (i.e. whether the task is
3642    /// permitted to call `task.cancel`).
3643    cancel_sent: bool,
3644    /// Whether or not we've sent a `Status::Starting` event to any current or
3645    /// future waiters for this waitable.
3646    starting_sent: bool,
3647    /// Context-local state used to implement the `context.{get,set}`
3648    /// intrinsics.
3649    context: [u32; 2],
3650    /// Pending guest subtasks created by this task (directly or indirectly).
3651    ///
3652    /// This is used to re-parent subtasks which are still running when their
3653    /// parent task is disposed.
3654    subtasks: HashSet<TableId<GuestTask>>,
3655    /// Scratch waitable set used to watch subtasks during synchronous calls.
3656    sync_call_set: TableId<WaitableSet>,
3657    /// The instance to which the exported function for this guest task belongs.
3658    ///
3659    /// Note that the task may do a sync->sync call via a fused adapter which
3660    /// results in that task executing code in a different instance, and it may
3661    /// call host functions and intrinsics from that other instance.
3662    instance: RuntimeComponentInstanceIndex,
3663    /// If present, a pending `Event::None` or `Event::Cancelled` to be
3664    /// delivered to this task.
3665    event: Option<Event>,
3666    /// If present, indicates that the task is currently waiting on the
3667    /// specified set but may be cancelled and woken immediately.
3668    wake_on_cancel: Option<TableId<WaitableSet>>,
3669    /// The `ExportIndex` of the guest function being called, if known.
3670    function_index: Option<ExportIndex>,
3671    /// Whether or not the task has exited.
3672    exited: bool,
3673}
3674
3675impl GuestTask {
3676    fn new(
3677        state: &mut ConcurrentState,
3678        lower_params: RawLower,
3679        lift_result: LiftResult,
3680        caller: Caller,
3681        callback: Option<CallbackFn>,
3682        component_instance: RuntimeComponentInstanceIndex,
3683    ) -> Result<Self> {
3684        let sync_call_set = state.push(WaitableSet::default())?;
3685
3686        Ok(Self {
3687            common: WaitableCommon::default(),
3688            lower_params: Some(lower_params),
3689            lift_result: Some(lift_result),
3690            result: None,
3691            callback,
3692            caller,
3693            call_context: Some(CallContext::default()),
3694            sync_result: None,
3695            cancel_sent: false,
3696            starting_sent: false,
3697            context: [0u32; 2],
3698            subtasks: HashSet::new(),
3699            sync_call_set,
3700            instance: component_instance,
3701            event: None,
3702            wake_on_cancel: None,
3703            function_index: None,
3704            exited: false,
3705        })
3706    }
3707
3708    /// Dispose of this guest task, reparenting any pending subtasks to the
3709    /// caller.
3710    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3711        // If there are not-yet-delivered completion events for subtasks in
3712        // `self.sync_call_set`, recursively dispose of those subtasks as well.
3713        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3714            if let Some(Event::Subtask {
3715                status: Status::Returned | Status::ReturnCancelled,
3716            }) = waitable.common(state)?.event
3717            {
3718                waitable.delete_from(state)?;
3719            }
3720        }
3721
3722        state.delete(self.sync_call_set)?;
3723
3724        // Reparent any pending subtasks to the caller.
3725        match &self.caller {
3726            Caller::Guest {
3727                task,
3728                instance: runtime_instance,
3729            } => {
3730                let task_mut = state.get_mut(*task)?;
3731                let present = task_mut.subtasks.remove(&me);
3732                assert!(present);
3733
3734                for subtask in &self.subtasks {
3735                    task_mut.subtasks.insert(*subtask);
3736                }
3737
3738                for subtask in &self.subtasks {
3739                    state.get_mut(*subtask)?.caller = Caller::Guest {
3740                        task: *task,
3741                        instance: *runtime_instance,
3742                    };
3743                }
3744            }
3745            Caller::Host { exit_tx, .. } => {
3746                for subtask in &self.subtasks {
3747                    state.get_mut(*subtask)?.caller = Caller::Host {
3748                        tx: None,
3749                        // Clone `exit_tx` to ensure that it is only dropped
3750                        // once all transitive subtasks of the host call have
3751                        // exited:
3752                        exit_tx: exit_tx.clone(),
3753                        remove_task_automatically: true,
3754                        call_post_return_automatically: true,
3755                    };
3756                }
3757            }
3758        }
3759
3760        Ok(())
3761    }
3762
3763    fn call_post_return_automatically(&self) -> bool {
3764        matches!(
3765            self.caller,
3766            Caller::Guest { .. }
3767                | Caller::Host {
3768                    call_post_return_automatically: true,
3769                    ..
3770                }
3771        )
3772    }
3773}
3774
3775impl TableDebug for GuestTask {
3776    fn type_name() -> &'static str {
3777        "GuestTask"
3778    }
3779}
3780
3781/// Represents state common to all kinds of waitables.
3782#[derive(Default)]
3783struct WaitableCommon {
3784    /// The currently pending event for this waitable, if any.
3785    event: Option<Event>,
3786    /// The set to which this waitable belongs, if any.
3787    set: Option<TableId<WaitableSet>>,
3788    /// The handle with which the guest refers to this waitable, if any.
3789    handle: Option<u32>,
3790}
3791
3792/// Represents a Component Model Async `waitable`.
3793#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3794enum Waitable {
3795    /// A host task
3796    Host(TableId<HostTask>),
3797    /// A guest task
3798    Guest(TableId<GuestTask>),
3799    /// The read or write end of a stream or future
3800    Transmit(TableId<TransmitHandle>),
3801}
3802
3803impl Waitable {
3804    /// Retrieve the `Waitable` corresponding to the specified guest-visible
3805    /// handle.
3806    fn from_instance(
3807        state: Pin<&mut ComponentInstance>,
3808        caller_instance: RuntimeComponentInstanceIndex,
3809        waitable: u32,
3810    ) -> Result<Self> {
3811        use crate::runtime::vm::component::Waitable;
3812
3813        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
3814
3815        Ok(match kind {
3816            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
3817            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
3818            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
3819        })
3820    }
3821
3822    /// Retrieve the host-visible identifier for this `Waitable`.
3823    fn rep(&self) -> u32 {
3824        match self {
3825            Self::Host(id) => id.rep(),
3826            Self::Guest(id) => id.rep(),
3827            Self::Transmit(id) => id.rep(),
3828        }
3829    }
3830
3831    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
3832    /// remove it from any set it may currently belong to (when `set` is
3833    /// `None`).
3834    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3835        log::trace!("waitable {self:?} join set {set:?}",);
3836
3837        let old = mem::replace(&mut self.common(state)?.set, set);
3838
3839        if let Some(old) = old {
3840            match *self {
3841                Waitable::Host(id) => state.remove_child(id, old),
3842                Waitable::Guest(id) => state.remove_child(id, old),
3843                Waitable::Transmit(id) => state.remove_child(id, old),
3844            }?;
3845
3846            state.get_mut(old)?.ready.remove(self);
3847        }
3848
3849        if let Some(set) = set {
3850            match *self {
3851                Waitable::Host(id) => state.add_child(id, set),
3852                Waitable::Guest(id) => state.add_child(id, set),
3853                Waitable::Transmit(id) => state.add_child(id, set),
3854            }?;
3855
3856            if self.common(state)?.event.is_some() {
3857                self.mark_ready(state)?;
3858            }
3859        }
3860
3861        Ok(())
3862    }
3863
3864    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
3865    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3866        Ok(match self {
3867            Self::Host(id) => &mut state.get_mut(*id)?.common,
3868            Self::Guest(id) => &mut state.get_mut(*id)?.common,
3869            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3870        })
3871    }
3872
3873    /// Set or clear the pending event for this waitable and either deliver it
3874    /// to the first waiter, if any, or mark it as ready to be delivered to the
3875    /// next waiter that arrives.
3876    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3877        log::trace!("set event for {self:?}: {event:?}");
3878        self.common(state)?.event = event;
3879        self.mark_ready(state)
3880    }
3881
3882    /// Take the pending event from this waitable, leaving `None` in its place.
3883    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
3884        let common = self.common(state)?;
3885        let event = common.event.take();
3886        if let Some(set) = self.common(state)?.set {
3887            state.get_mut(set)?.ready.remove(self);
3888        }
3889        Ok(event)
3890    }
3891
3892    /// Deliver the current event for this waitable to the first waiter, if any,
3893    /// or else mark it as ready to be delivered to the next waiter that
3894    /// arrives.
3895    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
3896        if let Some(set) = self.common(state)?.set {
3897            state.get_mut(set)?.ready.insert(*self);
3898            if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
3899                let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
3900                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
3901
3902                let item = match mode {
3903                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3904                    WaitMode::Callback => WorkItem::GuestCall(GuestCall {
3905                        task,
3906                        kind: GuestCallKind::DeliverEvent { set: Some(set) },
3907                    }),
3908                };
3909                state.push_high_priority(item);
3910            }
3911        }
3912        Ok(())
3913    }
3914
3915    /// Handle the imminent delivery of the specified event, e.g. by updating
3916    /// the state of the stream or future.
3917    fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
3918        match event {
3919            Event::FutureRead {
3920                pending: Some((ty, handle)),
3921                ..
3922            }
3923            | Event::FutureWrite {
3924                pending: Some((ty, handle)),
3925                ..
3926            } => {
3927                let runtime_instance = instance.component().types()[ty].instance;
3928                let (rep, state) = instance.guest_tables().0[runtime_instance]
3929                    .future_rep(ty, handle)
3930                    .unwrap();
3931                assert_eq!(rep, self.rep());
3932                assert_eq!(*state, TransmitLocalState::Busy);
3933                *state = match event {
3934                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
3935                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
3936                    _ => unreachable!(),
3937                };
3938            }
3939            Event::StreamRead {
3940                pending: Some((ty, handle)),
3941                code,
3942            }
3943            | Event::StreamWrite {
3944                pending: Some((ty, handle)),
3945                code,
3946            } => {
3947                let runtime_instance = instance.component().types()[ty].instance;
3948                let (rep, state) = instance.guest_tables().0[runtime_instance]
3949                    .stream_rep(ty, handle)
3950                    .unwrap();
3951                assert_eq!(rep, self.rep());
3952                assert_eq!(*state, TransmitLocalState::Busy);
3953                let done = matches!(code, ReturnCode::Dropped(_));
3954                *state = match event {
3955                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
3956                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
3957                    _ => unreachable!(),
3958                };
3959            }
3960            _ => {}
3961        }
3962    }
3963
3964    /// Remove this waitable from the instance's rep table.
3965    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
3966        match self {
3967            Self::Host(task) => {
3968                log::trace!("delete host task {task:?}");
3969                state.delete(*task)?;
3970            }
3971            Self::Guest(task) => {
3972                log::trace!("delete guest task {task:?}");
3973                state.delete(*task)?.dispose(state, *task)?;
3974            }
3975            Self::Transmit(task) => {
3976                state.delete(*task)?;
3977            }
3978        }
3979
3980        Ok(())
3981    }
3982}
3983
3984impl fmt::Debug for Waitable {
3985    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3986        match self {
3987            Self::Host(id) => write!(f, "{id:?}"),
3988            Self::Guest(id) => write!(f, "{id:?}"),
3989            Self::Transmit(id) => write!(f, "{id:?}"),
3990        }
3991    }
3992}
3993
3994/// Represents a Component Model Async `waitable-set`.
3995#[derive(Default)]
3996struct WaitableSet {
3997    /// Which waitables in this set have pending events, if any.
3998    ready: BTreeSet<Waitable>,
3999    /// Which guest tasks are currently waiting on this set, if any.
4000    waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
4001}
4002
4003impl TableDebug for WaitableSet {
4004    fn type_name() -> &'static str {
4005        "WaitableSet"
4006    }
4007}
4008
4009/// Type-erased closure to lower the parameters for a guest task.
4010type RawLower = Box<
4011    dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4012>;
4013
4014/// Type-erased closure to lift the result for a guest task.
4015type RawLift = Box<
4016    dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4017        + Send
4018        + Sync,
4019>;
4020
4021/// Type erased result of a guest task which may be downcast to the expected
4022/// type by a host caller (or simply ignored in the case of a guest caller; see
4023/// `DummyResult`).
4024type LiftedResult = Box<dyn Any + Send + Sync>;
4025
4026/// Used to return a result from a `LiftFn` when the actual result has already
4027/// been lowered to a guest task's stack and linear memory.
4028struct DummyResult;
4029
4030/// Represents the state of a currently executing fiber which has been resumed
4031/// via `self::poll_fn`.
4032pub(crate) struct AsyncState {
4033    /// The current instance being polled, if any, which is used to perform
4034    /// checks to ensure that futures are always polled within the correct
4035    /// instance.
4036    current_instance: Option<ComponentInstanceId>,
4037}
4038
4039impl Default for AsyncState {
4040    fn default() -> Self {
4041        Self {
4042            current_instance: None,
4043        }
4044    }
4045}
4046
4047/// Represents the Component Model Async state of a (sub-)component instance.
4048#[derive(Default)]
4049struct InstanceState {
4050    /// Whether backpressure is set for this instance (enabled if >0)
4051    backpressure: u16,
4052    /// Whether this instance can be entered
4053    do_not_enter: bool,
4054    /// Pending calls for this instance which require `Self::backpressure` to be
4055    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4056    pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
4057}
4058
4059/// Represents the Component Model Async state of a top-level component instance
4060/// (i.e. a `super::ComponentInstance`).
4061pub struct ConcurrentState {
4062    /// The currently running guest task, if any.
4063    guest_task: Option<TableId<GuestTask>>,
4064    /// The set of pending host and background tasks, if any.
4065    ///
4066    /// See `ComponentInstance::poll_until` for where we temporarily take this
4067    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4068    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4069    /// The table of waitables, waitable sets, etc.
4070    table: AlwaysMut<ResourceTable>,
4071    /// Per (sub-)component instance states.
4072    ///
4073    /// See `InstanceState` for details and note that this map is lazily
4074    /// populated as needed.
4075    // TODO: this can and should be a `PrimaryMap`
4076    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4077    /// The "high priority" work queue for this instance's event loop.
4078    high_priority: Vec<WorkItem>,
4079    /// The "high priority" work queue for this instance's event loop.
4080    low_priority: Vec<WorkItem>,
4081    /// A place to stash the reason a fiber is suspending so that the code which
4082    /// resumed it will know under what conditions the fiber should be resumed
4083    /// again.
4084    suspend_reason: Option<SuspendReason>,
4085    /// A cached fiber which is waiting for work to do.
4086    ///
4087    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4088    worker: Option<StoreFiber<'static>>,
4089    /// A place to stash the work item for which we're resuming a worker fiber.
4090    worker_item: Option<WorkerItem>,
4091
4092    /// Reference counts for all component error contexts
4093    ///
4094    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4095    /// (sub)component ref counts as tracked by `error_context_tables`, for
4096    /// example when the host holds one or more references to error contexts.
4097    ///
4098    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4099    /// component-wide representation) of the index into concurrent state for a given
4100    /// stored `ErrorContext`.
4101    ///
4102    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4103    /// as a `TableId<ErrorContextState>`.
4104    global_error_context_ref_counts:
4105        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4106
4107    /// Mirror of type information in `ComponentInstance`, placed here for
4108    /// convenience at the cost of an extra `Arc` clone.
4109    component: Component,
4110}
4111
4112impl ConcurrentState {
4113    pub(crate) fn new(component: &Component) -> Self {
4114        Self {
4115            guest_task: None,
4116            table: AlwaysMut::new(ResourceTable::new()),
4117            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4118            instance_states: HashMap::new(),
4119            high_priority: Vec::new(),
4120            low_priority: Vec::new(),
4121            suspend_reason: None,
4122            worker: None,
4123            worker_item: None,
4124            global_error_context_ref_counts: BTreeMap::new(),
4125            component: component.clone(),
4126        }
4127    }
4128
4129    /// Take ownership of any fibers and futures owned by this object.
4130    ///
4131    /// This should be used when disposing of the `Store` containing this object
4132    /// in order to gracefully resolve any and all fibers using
4133    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4134    /// use-after-free bugs due to fibers which may still have access to the
4135    /// `Store`.
4136    ///
4137    /// Additionally, the futures collected with this function should be dropped
4138    /// within a `tls::set` call, which will ensure than any futures closing
4139    /// over an `&Accessor` will have access to the store when dropped, allowing
4140    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4141    /// panicking.
4142    ///
4143    /// Note that this will leave the object in an inconsistent and unusable
4144    /// state, so it should only be used just prior to dropping it.
4145    pub(crate) fn take_fibers_and_futures(
4146        &mut self,
4147        fibers: &mut Vec<StoreFiber<'static>>,
4148        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4149    ) {
4150        for entry in self.table.get_mut().iter_mut() {
4151            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4152                for mode in mem::take(&mut set.waiting).into_values() {
4153                    if let WaitMode::Fiber(fiber) = mode {
4154                        fibers.push(fiber);
4155                    }
4156                }
4157            }
4158        }
4159
4160        if let Some(fiber) = self.worker.take() {
4161            fibers.push(fiber);
4162        }
4163
4164        let mut take_items = |list| {
4165            for item in mem::take(list) {
4166                match item {
4167                    WorkItem::ResumeFiber(fiber) => {
4168                        fibers.push(fiber);
4169                    }
4170                    WorkItem::PushFuture(future) => {
4171                        self.futures
4172                            .get_mut()
4173                            .as_mut()
4174                            .unwrap()
4175                            .push(future.into_inner());
4176                    }
4177                    _ => {}
4178                }
4179            }
4180        };
4181
4182        take_items(&mut self.high_priority);
4183        take_items(&mut self.low_priority);
4184
4185        if let Some(them) = self.futures.get_mut().take() {
4186            futures.push(them);
4187        }
4188    }
4189
4190    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4191        self.instance_states.entry(instance).or_default()
4192    }
4193
4194    fn push<V: Send + Sync + 'static>(
4195        &mut self,
4196        value: V,
4197    ) -> Result<TableId<V>, ResourceTableError> {
4198        self.table.get_mut().push(value).map(TableId::from)
4199    }
4200
4201    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4202        self.table.get_mut().get_mut(&Resource::from(id))
4203    }
4204
4205    pub fn add_child<T: 'static, U: 'static>(
4206        &mut self,
4207        child: TableId<T>,
4208        parent: TableId<U>,
4209    ) -> Result<(), ResourceTableError> {
4210        self.table
4211            .get_mut()
4212            .add_child(Resource::from(child), Resource::from(parent))
4213    }
4214
4215    pub fn remove_child<T: 'static, U: 'static>(
4216        &mut self,
4217        child: TableId<T>,
4218        parent: TableId<U>,
4219    ) -> Result<(), ResourceTableError> {
4220        self.table
4221            .get_mut()
4222            .remove_child(Resource::from(child), Resource::from(parent))
4223    }
4224
4225    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4226        self.table.get_mut().delete(Resource::from(id))
4227    }
4228
4229    fn push_future(&mut self, future: HostTaskFuture) {
4230        // Note that we can't directly push to `ConcurrentState::futures` here
4231        // since this may be called from a future that's being polled inside
4232        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4233        // so it has exclusive access while polling it.  Therefore, we push a
4234        // work item to the "high priority" queue, which will actually push to
4235        // `ConcurrentState::futures` later.
4236        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4237    }
4238
4239    fn push_high_priority(&mut self, item: WorkItem) {
4240        log::trace!("push high priority: {item:?}");
4241        self.high_priority.push(item);
4242    }
4243
4244    fn push_low_priority(&mut self, item: WorkItem) {
4245        log::trace!("push low priority: {item:?}");
4246        self.low_priority.push(item);
4247    }
4248
4249    /// Determine whether the instance associated with the specified guest task
4250    /// may be entered (i.e. is not already on the async call stack).
4251    ///
4252    /// This is an additional check on top of the "may_enter" instance flag;
4253    /// it's needed because async-lifted exports with callback functions must
4254    /// not call their own instances directly or indirectly, and due to the
4255    /// "stackless" nature of callback-enabled guest tasks this may happen even
4256    /// if there are no activation records on the stack (i.e. the "may_enter"
4257    /// field is `true`) for that instance.
4258    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4259        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4260
4261        // Walk the task tree back to the root, looking for potential
4262        // reentrance.
4263        //
4264        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4265        // such that each bit represents and instance which has been entered by
4266        // that task or an ancestor of that task, in which case this would be a
4267        // constant time check.
4268        loop {
4269            match &self.get_mut(guest_task).unwrap().caller {
4270                Caller::Host { .. } => break true,
4271                Caller::Guest { task, instance } => {
4272                    if *instance == guest_instance {
4273                        break false;
4274                    } else {
4275                        guest_task = *task;
4276                    }
4277                }
4278            }
4279        }
4280    }
4281
4282    /// Record that we're about to enter a (sub-)component instance which does
4283    /// not support more than one concurrent, stackful activation, meaning it
4284    /// cannot be entered again until the next call returns.
4285    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4286        self.instance_state(instance).do_not_enter = true;
4287    }
4288
4289    /// Record that we've exited a (sub-)component instance previously entered
4290    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4291    /// See the documentation for the latter for details.
4292    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4293        self.instance_state(instance).do_not_enter = false;
4294        self.partition_pending(instance)
4295    }
4296
4297    /// Iterate over `InstanceState::pending`, moving any ready items into the
4298    /// "high priority" work item queue.
4299    ///
4300    /// See `GuestCall::is_ready` for details.
4301    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4302        for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4303            let call = GuestCall { task, kind };
4304            if call.is_ready(self)? {
4305                self.push_high_priority(WorkItem::GuestCall(call));
4306            } else {
4307                self.instance_state(instance)
4308                    .pending
4309                    .insert(call.task, call.kind);
4310            }
4311        }
4312
4313        Ok(())
4314    }
4315
4316    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4317    pub(crate) fn backpressure_modify(
4318        &mut self,
4319        caller_instance: RuntimeComponentInstanceIndex,
4320        modify: impl FnOnce(u16) -> Option<u16>,
4321    ) -> Result<()> {
4322        let state = self.instance_state(caller_instance);
4323        let old = state.backpressure;
4324        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4325        state.backpressure = new;
4326
4327        if old > 0 && new == 0 {
4328            // Backpressure was previously enabled and is now disabled; move any
4329            // newly-eligible guest calls to the "high priority" queue.
4330            self.partition_pending(caller_instance)?;
4331        }
4332
4333        Ok(())
4334    }
4335
4336    /// Implements the `context.get` intrinsic.
4337    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4338        let task = self.guest_task.unwrap();
4339        let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()];
4340        log::trace!("context_get {task:?} slot {slot} val {val:#x}");
4341        Ok(val)
4342    }
4343
4344    /// Implements the `context.set` intrinsic.
4345    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4346        let task = self.guest_task.unwrap();
4347        log::trace!("context_set {task:?} slot {slot} val {val:#x}");
4348        self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
4349        Ok(())
4350    }
4351
4352    fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
4353        &self.component.env_component().options[options]
4354    }
4355}
4356
4357/// Provide a type hint to compiler about the shape of a parameter lower
4358/// closure.
4359fn for_any_lower<
4360    F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4361>(
4362    fun: F,
4363) -> F {
4364    fun
4365}
4366
4367/// Provide a type hint to compiler about the shape of a result lift closure.
4368fn for_any_lift<
4369    F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4370        + Send
4371        + Sync,
4372>(
4373    fun: F,
4374) -> F {
4375    fun
4376}
4377
4378/// Wrap the specified future in a `poll_fn` which asserts that the future is
4379/// only polled from the event loop of the specified `Instance`.
4380///
4381/// See `Instance::run_concurrent` for details.
4382fn checked<F: Future + Send + 'static>(
4383    instance: Instance,
4384    fut: F,
4385) -> impl Future<Output = F::Output> + Send + 'static {
4386    async move {
4387        let mut fut = pin!(fut);
4388        future::poll_fn(move |cx| {
4389            let message = "\
4390                `Future`s which depend on asynchronous component tasks, streams, or \
4391                futures to complete may only be polled from the event loop of the \
4392                instance from which they originated.  Please use \
4393                `Instance::{run_concurrent,spawn}` to poll or await them.\
4394            ";
4395            tls::try_get(|store| {
4396                let matched = match store {
4397                    tls::TryGet::Some(store) => {
4398                        let a = store.concurrent_async_state_mut().current_instance;
4399                        a == Some(instance.id().instance())
4400                    }
4401                    tls::TryGet::Taken | tls::TryGet::None => false,
4402                };
4403
4404                if !matched {
4405                    panic!("{message}")
4406                }
4407            });
4408            fut.as_mut().poll(cx)
4409        })
4410        .await
4411    }
4412}
4413
4414/// Assert that `Instance::run_concurrent` has not been called from within an
4415/// instance's event loop.
4416fn check_recursive_run() {
4417    tls::try_get(|store| {
4418        if !matches!(store, tls::TryGet::None) {
4419            panic!("Recursive `Instance::run_concurrent` calls not supported")
4420        }
4421    });
4422}
4423
4424fn unpack_callback_code(code: u32) -> (u32, u32) {
4425    (code & 0xF, code >> 4)
4426}
4427
4428/// Helper struct for packaging parameters to be passed to
4429/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4430/// `waitable-set.poll`.
4431struct WaitableCheckParams {
4432    set: TableId<WaitableSet>,
4433    options: OptionsIndex,
4434    payload: u32,
4435}
4436
4437/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4438enum WaitableCheck {
4439    Wait(WaitableCheckParams),
4440    Poll(WaitableCheckParams),
4441    Yield,
4442}
4443
4444/// Represents a guest task called from the host, prepared using `prepare_call`.
4445pub(crate) struct PreparedCall<R> {
4446    /// The guest export to be called
4447    handle: Func,
4448    /// The guest task created by `prepare_call`
4449    task: TableId<GuestTask>,
4450    /// The number of lowered core Wasm parameters to pass to the call.
4451    param_count: usize,
4452    /// The `oneshot::Receiver` to which the result of the call will be
4453    /// delivered when it is available.
4454    rx: oneshot::Receiver<LiftedResult>,
4455    /// The `oneshot::Receiver` which will resolve when the task -- and any
4456    /// transitive subtasks -- have all exited.
4457    exit_rx: oneshot::Receiver<()>,
4458    _phantom: PhantomData<R>,
4459}
4460
4461impl<R> PreparedCall<R> {
4462    /// Get a copy of the `TaskId` for this `PreparedCall`.
4463    pub(crate) fn task_id(&self) -> TaskId {
4464        TaskId {
4465            handle: self.handle,
4466            task: self.task,
4467        }
4468    }
4469}
4470
4471/// Represents a task created by `prepare_call`.
4472pub(crate) struct TaskId {
4473    handle: Func,
4474    task: TableId<GuestTask>,
4475}
4476
4477impl TaskId {
4478    /// Remove the specified task from the concurrent state to which it belongs.
4479    ///
4480    /// This must be used with care to avoid use-after-delete or double-delete
4481    /// bugs.  Specifically, it should only be called on tasks created with the
4482    /// `remove_task_automatically` parameter to `prepare_call` set to `false`,
4483    /// which tells the runtime that the caller is responsible for removing the
4484    /// task from the state; otherwise, it will be removed automatically.  Also,
4485    /// it should only be called once for a given task, and only after either
4486    /// the task has completed or the instance has trapped.
4487    pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4488        Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
4489    }
4490}
4491
4492/// Prepare a call to the specified exported Wasm function, providing functions
4493/// for lowering the parameters and lifting the result.
4494///
4495/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4496/// loop, use `queue_call`.
4497pub(crate) fn prepare_call<T, R>(
4498    mut store: StoreContextMut<T>,
4499    handle: Func,
4500    param_count: usize,
4501    remove_task_automatically: bool,
4502    call_post_return_automatically: bool,
4503    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4504    + Send
4505    + Sync
4506    + 'static,
4507    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4508    + Send
4509    + Sync
4510    + 'static,
4511) -> Result<PreparedCall<R>> {
4512    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4513
4514    let instance = handle.instance().id().get(store.0);
4515    let task_return_type = instance.component().types()[ty].results;
4516    let component_instance = raw_options.instance;
4517    let callback = options.callback();
4518    let memory = options.memory_raw().map(SendSyncPtr::new);
4519    let string_encoding = options.string_encoding();
4520    let token = StoreToken::new(store.as_context_mut());
4521    let state = handle.instance().concurrent_state_mut(store.0);
4522
4523    assert!(state.guest_task.is_none());
4524
4525    let (tx, rx) = oneshot::channel();
4526    let (exit_tx, exit_rx) = oneshot::channel();
4527
4528    let mut task = GuestTask::new(
4529        state,
4530        Box::new(for_any_lower(move |store, instance, params| {
4531            debug_assert!(instance.id() == handle.instance().id());
4532            lower_params(handle, token.as_context_mut(store), params)
4533        })),
4534        LiftResult {
4535            lift: Box::new(for_any_lift(move |store, instance, result| {
4536                debug_assert!(instance.id() == handle.instance().id());
4537                lift_result(handle, store, result)
4538            })),
4539            ty: task_return_type,
4540            memory,
4541            string_encoding,
4542        },
4543        Caller::Host {
4544            tx: Some(tx),
4545            exit_tx: Arc::new(exit_tx),
4546            remove_task_automatically,
4547            call_post_return_automatically,
4548        },
4549        callback.map(|callback| {
4550            let callback = SendSyncPtr::new(callback);
4551            Box::new(
4552                move |store: &mut dyn VMStore,
4553                      instance: Instance,
4554                      runtime_instance,
4555                      event,
4556                      handle| {
4557                    let store = token.as_context_mut(store);
4558                    // SAFETY: Per the contract of `prepare_call`, the callback
4559                    // will remain valid at least as long is this task exists.
4560                    unsafe {
4561                        instance.call_callback(
4562                            store,
4563                            runtime_instance,
4564                            callback,
4565                            event,
4566                            handle,
4567                            call_post_return_automatically,
4568                        )
4569                    }
4570                },
4571            ) as CallbackFn
4572        }),
4573        component_instance,
4574    )?;
4575    task.function_index = Some(handle.index());
4576
4577    let task = state.push(task)?;
4578
4579    Ok(PreparedCall {
4580        handle,
4581        task,
4582        param_count,
4583        rx,
4584        exit_rx,
4585        _phantom: PhantomData,
4586    })
4587}
4588
4589/// Queue a call previously prepared using `prepare_call` to be run as part of
4590/// the associated `ComponentInstance`'s event loop.
4591///
4592/// The returned future will resolve to the result once it is available, but
4593/// must only be polled via the instance's event loop. See
4594/// `Instance::run_concurrent` for details.
4595pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4596    mut store: StoreContextMut<T>,
4597    prepared: PreparedCall<R>,
4598) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4599    let PreparedCall {
4600        handle,
4601        task,
4602        param_count,
4603        rx,
4604        exit_rx,
4605        ..
4606    } = prepared;
4607
4608    queue_call0(store.as_context_mut(), handle, task, param_count)?;
4609
4610    Ok(checked(
4611        handle.instance(),
4612        rx.map(move |result| {
4613            result
4614                .map(|v| (*v.downcast().unwrap(), exit_rx))
4615                .map_err(anyhow::Error::from)
4616        }),
4617    ))
4618}
4619
4620/// Queue a call previously prepared using `prepare_call` to be run as part of
4621/// the associated `ComponentInstance`'s event loop.
4622fn queue_call0<T: 'static>(
4623    store: StoreContextMut<T>,
4624    handle: Func,
4625    guest_task: TableId<GuestTask>,
4626    param_count: usize,
4627) -> Result<()> {
4628    let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4629    let is_concurrent = raw_options.async_;
4630    let instance = handle.instance();
4631    let callee = handle.lifted_core_func(store.0);
4632    let callback = options.callback();
4633    let post_return = handle.post_return_core_func(store.0);
4634
4635    log::trace!("queueing call {guest_task:?}");
4636
4637    let instance_flags = if callback.is_none() {
4638        None
4639    } else {
4640        Some(flags)
4641    };
4642
4643    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
4644    // (with signatures appropriate for this call) and will remain valid as
4645    // long as this instance is valid.
4646    unsafe {
4647        instance.queue_call(
4648            store,
4649            guest_task,
4650            SendSyncPtr::new(callee),
4651            param_count,
4652            1,
4653            instance_flags,
4654            is_concurrent,
4655            callback.map(SendSyncPtr::new),
4656            post_return.map(SendSyncPtr::new),
4657        )
4658    }
4659}