wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56    ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61    CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
65use anyhow::{Context as _, Result, anyhow, bail};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, Either, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::slice;
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113/// Constant defined in the Component Model spec to indicate that the async
114/// intrinsic (e.g. `future.write`) has not yet completed.
115const BLOCKED: u32 = 0xffff_ffff;
116
117/// Corresponds to `CallState` in the upstream spec.
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120    Starting = 0,
121    Started = 1,
122    Returned = 2,
123    StartCancelled = 3,
124    ReturnCancelled = 4,
125}
126
127impl Status {
128    /// Packs this status and the optional `waitable` provided into a 32-bit
129    /// result that the canonical ABI requires.
130    ///
131    /// The low 4 bits are reserved for the status while the upper 28 bits are
132    /// the waitable, if present.
133    pub fn pack(self, waitable: Option<u32>) -> u32 {
134        assert!(matches!(self, Status::Returned) == waitable.is_none());
135        let waitable = waitable.unwrap_or(0);
136        assert!(waitable < (1 << 28));
137        (waitable << 4) | (self as u32)
138    }
139}
140
141/// Corresponds to `EventCode` in the Component Model spec, plus related payload
142/// data.
143#[derive(Clone, Copy, Debug)]
144enum Event {
145    None,
146    Cancelled,
147    Subtask {
148        status: Status,
149    },
150    StreamRead {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    StreamWrite {
155        code: ReturnCode,
156        pending: Option<(TypeStreamTableIndex, u32)>,
157    },
158    FutureRead {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162    FutureWrite {
163        code: ReturnCode,
164        pending: Option<(TypeFutureTableIndex, u32)>,
165    },
166}
167
168impl Event {
169    /// Lower this event to core Wasm integers for delivery to the guest.
170    ///
171    /// Note that the waitable handle, if any, is assumed to be lowered
172    /// separately.
173    fn parts(self) -> (u32, u32) {
174        const EVENT_NONE: u32 = 0;
175        const EVENT_SUBTASK: u32 = 1;
176        const EVENT_STREAM_READ: u32 = 2;
177        const EVENT_STREAM_WRITE: u32 = 3;
178        const EVENT_FUTURE_READ: u32 = 4;
179        const EVENT_FUTURE_WRITE: u32 = 5;
180        const EVENT_CANCELLED: u32 = 6;
181        match self {
182            Event::None => (EVENT_NONE, 0),
183            Event::Cancelled => (EVENT_CANCELLED, 0),
184            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189        }
190    }
191}
192
193/// Corresponds to `CallbackCode` in the spec.
194mod callback_code {
195    pub const EXIT: u32 = 0;
196    pub const YIELD: u32 = 1;
197    pub const WAIT: u32 = 2;
198    pub const POLL: u32 = 3;
199}
200
201/// A flag indicating that the callee is an async-lowered export.
202///
203/// This may be passed to the `async-start` intrinsic from a fused adapter.
204const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
205
206/// Provides access to either store data (via the `get` method) or the store
207/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
208/// instance to which the current host task belongs.
209///
210/// See [`Accessor::with`] for details.
211pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
212    store: StoreContextMut<'a, T>,
213    get_data: fn(&mut T) -> D::Data<'_>,
214}
215
216impl<'a, T, D> Access<'a, T, D>
217where
218    D: HasData + ?Sized,
219    T: 'static,
220{
221    /// Creates a new [`Access`] from its component parts.
222    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
223        Self { store, get_data }
224    }
225
226    /// Get mutable access to the store data.
227    pub fn data_mut(&mut self) -> &mut T {
228        self.store.data_mut()
229    }
230
231    /// Get mutable access to the store data.
232    pub fn get(&mut self) -> D::Data<'_> {
233        (self.get_data)(self.data_mut())
234    }
235
236    /// Spawn a background task.
237    ///
238    /// See [`Accessor::spawn`] for details.
239    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
240    where
241        T: 'static,
242    {
243        let accessor = Accessor {
244            get_data: self.get_data,
245            token: StoreToken::new(self.store.as_context_mut()),
246        };
247        self.store
248            .as_context_mut()
249            .spawn_with_accessor(accessor, task)
250    }
251
252    /// Returns the getter this accessor is using to project from `T` into
253    /// `D::Data`.
254    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
255        self.get_data
256    }
257}
258
259impl<'a, T, D> AsContext for Access<'a, T, D>
260where
261    D: HasData + ?Sized,
262    T: 'static,
263{
264    type Data = T;
265
266    fn as_context(&self) -> StoreContext<'_, T> {
267        self.store.as_context()
268    }
269}
270
271impl<'a, T, D> AsContextMut for Access<'a, T, D>
272where
273    D: HasData + ?Sized,
274    T: 'static,
275{
276    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
277        self.store.as_context_mut()
278    }
279}
280
281/// Provides scoped mutable access to store data in the context of a concurrent
282/// host task future.
283///
284/// This allows multiple host task futures to execute concurrently and access
285/// the store between (but not across) `await` points.
286///
287/// # Rationale
288///
289/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
290/// `D::Data<'_>`. The problem this is solving, however, is that it does not
291/// literally store these values. The basic problem is that when a concurrent
292/// host future is being polled it has access to `&mut T` (and the whole
293/// `Store`) but when it's not being polled it does not have access to these
294/// values. This reflects how the store is only ever polling one future at a
295/// time so the store is effectively being passed between futures.
296///
297/// Rust's `Future` trait, however, has no means of passing a `Store`
298/// temporarily between futures. The [`Context`](std::task::Context) type does
299/// not have the ability to attach arbitrary information to it at this time.
300/// This type, [`Accessor`], is used to bridge this expressivity gap.
301///
302/// The [`Accessor`] type here represents the ability to acquire, temporarily in
303/// a synchronous manner, the current store. The [`Accessor::with`] function
304/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
305/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
306/// not take an `async` closure as its argument, instead it's a synchronous
307/// closure which must complete during on run of `Future::poll`. This reflects
308/// how the store is temporarily made available while a host future is being
309/// polled.
310///
311/// # Implementation
312///
313/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
314/// this type additionally doesn't even have a lifetime parameter. This is
315/// instead a representation of proof of the ability to acquire these while a
316/// future is being polled. Wasmtime will, when it polls a host future,
317/// configure ambient state such that the `Accessor` that a future closes over
318/// will work and be able to access the store.
319///
320/// This has a number of implications for users such as:
321///
322/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
323///   the lifetime of a single future.
324/// * A future is expected to, however, close over an `Accessor` and keep it
325///   alive probably for the duration of the entire future.
326/// * Different host futures will be given different `Accessor`s, and that's
327///   intentional.
328/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
329///   alleviates some otherwise required bounds to be written down.
330///
331/// # Using `Accessor` in `Drop`
332///
333/// The methods on `Accessor` are only expected to work in the context of
334/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
335/// host future can be dropped at any time throughout the system and Wasmtime
336/// store context is not necessarily available at that time. It's recommended to
337/// not use `Accessor` methods in anything connected to a `Drop` implementation
338/// as they will panic and have unintended results. If you run into this though
339/// feel free to file an issue on the Wasmtime repository.
340pub struct Accessor<T: 'static, D = HasSelf<T>>
341where
342    D: HasData + ?Sized,
343{
344    token: StoreToken<T>,
345    get_data: fn(&mut T) -> D::Data<'_>,
346}
347
348/// A helper trait to take any type of accessor-with-data in functions.
349///
350/// This trait is similar to [`AsContextMut`] except that it's used when
351/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
352/// [`Accessor`] is the main type used in concurrent settings and is passed to
353/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
354///
355/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
356/// this trait. This effectively means that regardless of the `D` in
357/// `Accessor<T, D>` it can still be passed to a function which just needs a
358/// store accessor.
359///
360/// Acquiring an [`Accessor`] can be done through
361/// [`StoreContextMut::run_concurrent`] for example or in a host function
362/// through
363/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
364pub trait AsAccessor {
365    /// The `T` in `Store<T>` that this accessor refers to.
366    type Data: 'static;
367
368    /// The `D` in `Accessor<T, D>`, or the projection out of
369    /// `Self::Data`.
370    type AccessorData: HasData + ?Sized;
371
372    /// Returns the accessor that this is referring to.
373    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
374}
375
376impl<T: AsAccessor + ?Sized> AsAccessor for &T {
377    type Data = T::Data;
378    type AccessorData = T::AccessorData;
379
380    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
381        T::as_accessor(self)
382    }
383}
384
385impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
386    type Data = T;
387    type AccessorData = D;
388
389    fn as_accessor(&self) -> &Accessor<T, D> {
390        self
391    }
392}
393
394// Note that it is intentional at this time that `Accessor` does not actually
395// store `&mut T` or anything similar. This distinctly enables the `Accessor`
396// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
397// that matter). This is used to ergonomically simplify bindings where the
398// majority of the time `Accessor` is closed over in a future which then needs
399// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
400// you already have to write `T: 'static`...) it helps to avoid this.
401//
402// Note as well that `Accessor` doesn't actually store its data at all. Instead
403// it's more of a "proof" of what can be accessed from TLS. API design around
404// `Accessor` and functions like `Linker::func_wrap_concurrent` are
405// intentionally made to ensure that `Accessor` is ideally only used in the
406// context that TLS variables are actually set. For example host functions are
407// given `&Accessor`, not `Accessor`, and this prevents them from persisting
408// the value outside of a future. Within the future the TLS variables are all
409// guaranteed to be set while the future is being polled.
410//
411// Finally though this is not an ironclad guarantee, but nor does it need to be.
412// The TLS APIs are designed to panic or otherwise model usage where they're
413// called recursively or similar. It's hoped that code cannot be constructed to
414// actually hit this at runtime but this is not a safety requirement at this
415// time.
416const _: () = {
417    const fn assert<T: Send + Sync>() {}
418    assert::<Accessor<UnsafeCell<u32>>>();
419};
420
421impl<T> Accessor<T> {
422    /// Creates a new `Accessor` backed by the specified functions.
423    ///
424    /// - `get`: used to retrieve the store
425    ///
426    /// - `get_data`: used to "project" from the store's associated data to
427    /// another type (e.g. a field of that data or a wrapper around it).
428    ///
429    /// - `spawn`: used to queue spawned background tasks to be run later
430    pub(crate) fn new(token: StoreToken<T>) -> Self {
431        Self {
432            token,
433            get_data: |x| x,
434        }
435    }
436}
437
438impl<T, D> Accessor<T, D>
439where
440    D: HasData + ?Sized,
441{
442    /// Run the specified closure, passing it mutable access to the store.
443    ///
444    /// This function is one of the main building blocks of the [`Accessor`]
445    /// type. This yields synchronous, blocking, access to store via an
446    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
447    /// providing the ability to access `D` via [`Access::get`]. Note that the
448    /// `fun` here is given only temporary access to the store and `T`/`D`
449    /// meaning that the return value `R` here is not allowed to capture borrows
450    /// into the two. If access is needed to data within `T` or `D` outside of
451    /// this closure then it must be `clone`d out, for example.
452    ///
453    /// # Panics
454    ///
455    /// This function will panic if it is call recursively with any other
456    /// accessor already in scope. For example if `with` is called within `fun`,
457    /// then this function will panic. It is up to the embedder to ensure that
458    /// this does not happen.
459    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
460        tls::get(|vmstore| {
461            fun(Access {
462                store: self.token.as_context_mut(vmstore),
463                get_data: self.get_data,
464            })
465        })
466    }
467
468    /// Returns the getter this accessor is using to project from `T` into
469    /// `D::Data`.
470    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
471        self.get_data
472    }
473
474    /// Changes this accessor to access `D2` instead of the current type
475    /// parameter `D`.
476    ///
477    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
478    ///
479    /// # Panics
480    ///
481    /// When using this API the returned value is disconnected from `&self` and
482    /// the lifetime binding the `self` argument. An `Accessor` only works
483    /// within the context of the closure or async closure that it was
484    /// originally given to, however. This means that due to the fact that the
485    /// returned value has no lifetime connection it's possible to use the
486    /// accessor outside of `&self`, the original accessor, and panic.
487    ///
488    /// The returned value should only be used within the scope of the original
489    /// `Accessor` that `self` refers to.
490    pub fn with_getter<D2: HasData>(
491        &self,
492        get_data: fn(&mut T) -> D2::Data<'_>,
493    ) -> Accessor<T, D2> {
494        Accessor {
495            token: self.token,
496            get_data,
497        }
498    }
499
500    /// Spawn a background task which will receive an `&Accessor<T, D>` and
501    /// run concurrently with any other tasks in progress for the current
502    /// store.
503    ///
504    /// This is particularly useful for host functions which return a `stream`
505    /// or `future` such that the code to write to the write end of that
506    /// `stream` or `future` must run after the function returns.
507    ///
508    /// The returned [`JoinHandle`] may be used to cancel the task.
509    ///
510    /// # Panics
511    ///
512    /// Panics if called within a closure provided to the [`Accessor::with`]
513    /// function. This can only be called outside an active invocation of
514    /// [`Accessor::with`].
515    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
516    where
517        T: 'static,
518    {
519        let accessor = self.clone_for_spawn();
520        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
521    }
522
523    fn clone_for_spawn(&self) -> Self {
524        Self {
525            token: self.token,
526            get_data: self.get_data,
527        }
528    }
529}
530
531/// Represents a task which may be provided to `Accessor::spawn`,
532/// `Accessor::forward`, or `StorecContextMut::spawn`.
533// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
534// option.
535//
536// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
537// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
538// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
539// Send + Sync + 'static` fails with a type mismatch error when we try to pass
540// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
541// best we can do for the time being.
542pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
543where
544    D: HasData + ?Sized,
545{
546    /// Run the task.
547    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
548}
549
550/// Represents parameter and result metadata for the caller side of a
551/// guest->guest call orchestrated by a fused adapter.
552enum CallerInfo {
553    /// Metadata for a call to an async-lowered import
554    Async {
555        params: Vec<ValRaw>,
556        has_result: bool,
557    },
558    /// Metadata for a call to an sync-lowered import
559    Sync {
560        params: Vec<ValRaw>,
561        result_count: u32,
562    },
563}
564
565/// Indicates how a guest task is waiting on a waitable set.
566enum WaitMode {
567    /// The guest task is waiting using `task.wait`
568    Fiber(StoreFiber<'static>),
569    /// The guest task is waiting via a callback declared as part of an
570    /// async-lifted export.
571    Callback(Instance),
572}
573
574/// Represents the reason a fiber is suspending itself.
575#[derive(Debug)]
576enum SuspendReason {
577    /// The fiber is waiting for an event to be delivered to the specified
578    /// waitable set or task.
579    Waiting {
580        set: TableId<WaitableSet>,
581        thread: QualifiedThreadId,
582        skip_may_block_check: bool,
583    },
584    /// The fiber has finished handling its most recent work item and is waiting
585    /// for another (or to be dropped if it is no longer needed).
586    NeedWork,
587    /// The fiber is yielding and should be resumed once other tasks have had a
588    /// chance to run.
589    Yielding { thread: QualifiedThreadId },
590    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
591    ExplicitlySuspending {
592        thread: QualifiedThreadId,
593        skip_may_block_check: bool,
594    },
595}
596
597/// Represents a pending call into guest code for a given guest task.
598enum GuestCallKind {
599    /// Indicates there's an event to deliver to the task, possibly related to a
600    /// waitable set the task has been waiting on or polling.
601    DeliverEvent {
602        /// The instance to which the task belongs.
603        instance: Instance,
604        /// The waitable set the event belongs to, if any.
605        ///
606        /// If this is `None` the event will be waiting in the
607        /// `GuestTask::event` field for the task.
608        set: Option<TableId<WaitableSet>>,
609    },
610    /// Indicates that a new guest task call is pending and may be executed
611    /// using the specified closure.
612    ///
613    /// If the closure returns `Ok(Some(call))`, the `call` should be run
614    /// immediately using `handle_guest_call`.
615    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
616    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
617}
618
619impl fmt::Debug for GuestCallKind {
620    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621        match self {
622            Self::DeliverEvent { instance, set } => f
623                .debug_struct("DeliverEvent")
624                .field("instance", instance)
625                .field("set", set)
626                .finish(),
627            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
628            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
629        }
630    }
631}
632
633/// Represents a pending call into guest code for a given guest thread.
634#[derive(Debug)]
635struct GuestCall {
636    thread: QualifiedThreadId,
637    kind: GuestCallKind,
638}
639
640impl GuestCall {
641    /// Returns whether or not the call is ready to run.
642    ///
643    /// A call will not be ready to run if either:
644    ///
645    /// - the (sub-)component instance to be called has already been entered and
646    /// cannot be reentered until an in-progress call completes
647    ///
648    /// - the call is for a not-yet started task and the (sub-)component
649    /// instance to be called has backpressure enabled
650    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
651        let instance = store
652            .concurrent_state_mut()
653            .get_mut(self.thread.task)?
654            .instance;
655        let state = store.instance_state(instance);
656
657        let ready = match &self.kind {
658            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
659            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
660            GuestCallKind::StartExplicit(_) => true,
661        };
662        log::trace!(
663            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
664            state.do_not_enter,
665            state.backpressure
666        );
667        Ok(ready)
668    }
669}
670
671/// Job to be run on a worker fiber.
672enum WorkerItem {
673    GuestCall(GuestCall),
674    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
675}
676
677/// Represents state related to an in-progress poll operation (e.g. `task.poll`
678/// or `CallbackCode.POLL`).
679#[derive(Debug)]
680struct PollParams {
681    /// The instance to which the polling thread belongs.
682    instance: Instance,
683    /// The polling thread.
684    thread: QualifiedThreadId,
685    /// The waitable set being polled.
686    set: TableId<WaitableSet>,
687}
688
689/// Represents a pending work item to be handled by the event loop for a given
690/// component instance.
691enum WorkItem {
692    /// A host task to be pushed to `ConcurrentState::futures`.
693    PushFuture(AlwaysMut<HostTaskFuture>),
694    /// A fiber to resume.
695    ResumeFiber(StoreFiber<'static>),
696    /// A pending call into guest code for a given guest task.
697    GuestCall(GuestCall),
698    /// A pending `task.poll` or `CallbackCode.POLL` operation.
699    Poll(PollParams),
700    /// A job to run on a worker fiber.
701    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
702}
703
704impl fmt::Debug for WorkItem {
705    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
706        match self {
707            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
708            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
709            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
710            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
711            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
712        }
713    }
714}
715
716/// Whether a suspension intrinsic was cancelled or completed
717#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
718pub(crate) enum WaitResult {
719    Cancelled,
720    Completed,
721}
722
723/// Raise a trap if the calling task is synchronous and trying to block prior to
724/// returning a value.
725pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
726    store.concurrent_state_mut().check_blocking()
727}
728
729/// Poll the specified future until it completes on behalf of a guest->host call
730/// using a sync-lowered import.
731///
732/// This is similar to `Instance::first_poll` except it's for sync-lowered
733/// imports, meaning we don't need to handle cancellation and we can block the
734/// caller until the task completes, at which point the caller can handle
735/// lowering the result to the guest's stack and linear memory.
736pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
737    store: &mut dyn VMStore,
738    future: impl Future<Output = Result<R>> + Send + 'static,
739    caller_instance: RuntimeComponentInstanceIndex,
740) -> Result<R> {
741    let state = store.concurrent_state_mut();
742
743    // If there is no current guest thread set, that means the host function was
744    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
745    // should complete immediately.
746    let Some(caller) = state.guest_thread else {
747        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
748            Poll::Ready(result) => result,
749            Poll::Pending => {
750                unreachable!()
751            }
752        };
753    };
754
755    // Save any existing result stashed in `GuestTask::result` so we can replace
756    // it with the new result.
757    let old_result = state
758        .get_mut(caller.task)
759        .with_context(|| format!("bad handle: {caller:?}"))?
760        .result
761        .take();
762
763    // Add a temporary host task into the table so we can track its progress.
764    // Note that we'll never allocate a waitable handle for the guest since
765    // we're being called synchronously.
766    let task = state.push(HostTask::new(caller_instance, None))?;
767
768    log::trace!("new host task child of {caller:?}: {task:?}");
769
770    // Wrap the future in a closure which will take care of stashing the result
771    // in `GuestTask::result` and resuming this fiber when the host task
772    // completes.
773    let mut future = Box::pin(async move {
774        let result = future.await?;
775        tls::get(move |store| {
776            let state = store.concurrent_state_mut();
777            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
778
779            Waitable::Host(task).set_event(
780                state,
781                Some(Event::Subtask {
782                    status: Status::Returned,
783                }),
784            )?;
785
786            Ok(())
787        })
788    }) as HostTaskFuture;
789
790    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
791    // add the future to `ConcurrentState::futures` and poll it automatically
792    // from the event loop if it doesn't complete immediately here.
793    let poll = tls::set(store, || {
794        future
795            .as_mut()
796            .poll(&mut Context::from_waker(&Waker::noop()))
797    });
798
799    match poll {
800        Poll::Ready(result) => {
801            // It completed immediately; check the result and delete the task.
802            result?;
803            log::trace!("delete host task {task:?} (already ready)");
804            store.concurrent_state_mut().delete(task)?;
805        }
806        Poll::Pending => {
807            // It did not complete immediately; add it to
808            // `ConcurrentState::futures` so it will be polled via the event
809            // loop; then use `GuestTask::sync_call_set` to wait for the task to
810            // complete, suspending the current fiber until it does so.
811            let state = store.concurrent_state_mut();
812            state.push_future(future);
813
814            let set = state.get_mut(caller.task)?.sync_call_set;
815            Waitable::Host(task).join(state, Some(set))?;
816
817            store.suspend(SuspendReason::Waiting {
818                set,
819                thread: caller,
820                skip_may_block_check: false,
821            })?;
822        }
823    }
824
825    // Retrieve and return the result.
826    Ok(*mem::replace(
827        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
828        old_result,
829    )
830    .unwrap()
831    .downcast()
832    .unwrap())
833}
834
835/// Execute the specified guest call.
836fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
837    let mut next = Some(call);
838    while let Some(call) = next.take() {
839        match call.kind {
840            GuestCallKind::DeliverEvent { instance, set } => {
841                let (event, waitable) = instance
842                    .get_event(store, call.thread.task, set, true)?
843                    .unwrap();
844                let state = store.concurrent_state_mut();
845                let task = state.get_mut(call.thread.task)?;
846                let runtime_instance = task.instance;
847                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
848
849                log::trace!(
850                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
851                    call.thread,
852                );
853
854                let old_thread = state.guest_thread.replace(call.thread);
855                log::trace!(
856                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
857                    call.thread
858                );
859
860                store.maybe_push_call_context(call.thread.task)?;
861
862                store.enter_instance(runtime_instance);
863
864                let callback = store
865                    .concurrent_state_mut()
866                    .get_mut(call.thread.task)?
867                    .callback
868                    .take()
869                    .unwrap();
870
871                let code = callback(store, runtime_instance.index, event, handle)?;
872
873                store
874                    .concurrent_state_mut()
875                    .get_mut(call.thread.task)?
876                    .callback = Some(callback);
877
878                store.exit_instance(runtime_instance)?;
879
880                store.maybe_pop_call_context(call.thread.task)?;
881
882                next = instance.handle_callback_code(
883                    store,
884                    call.thread,
885                    runtime_instance.index,
886                    code,
887                )?;
888
889                store.concurrent_state_mut().guest_thread = old_thread;
890                log::trace!(
891                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
892                );
893            }
894            GuestCallKind::StartImplicit(fun) => {
895                next = fun(store)?;
896            }
897            GuestCallKind::StartExplicit(fun) => {
898                fun(store)?;
899            }
900        }
901    }
902
903    Ok(())
904}
905
906impl<T> Store<T> {
907    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
908    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
909    where
910        T: Send + 'static,
911    {
912        self.as_context_mut().run_concurrent(fun).await
913    }
914
915    #[doc(hidden)]
916    pub fn assert_concurrent_state_empty(&mut self) {
917        self.as_context_mut().assert_concurrent_state_empty();
918    }
919
920    /// Convenience wrapper for [`StoreContextMut::spawn`].
921    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
922    where
923        T: 'static,
924    {
925        self.as_context_mut().spawn(task)
926    }
927}
928
929impl<T> StoreContextMut<'_, T> {
930    /// Assert that all the relevant tables and queues in the concurrent state
931    /// for this store are empty.
932    ///
933    /// This is for sanity checking in integration tests
934    /// (e.g. `component-async-tests`) that the relevant state has been cleared
935    /// after each test concludes.  This should help us catch leaks, e.g. guest
936    /// tasks which haven't been deleted despite having completed and having
937    /// been dropped by their supertasks.
938    #[doc(hidden)]
939    pub fn assert_concurrent_state_empty(self) {
940        let store = self.0;
941        store
942            .store_data_mut()
943            .components
944            .assert_instance_states_empty();
945        let state = store.concurrent_state_mut();
946        assert!(
947            state.table.get_mut().is_empty(),
948            "non-empty table: {:?}",
949            state.table.get_mut()
950        );
951        assert!(state.high_priority.is_empty());
952        assert!(state.low_priority.is_empty());
953        assert!(state.guest_thread.is_none());
954        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
955        assert!(state.global_error_context_ref_counts.is_empty());
956    }
957
958    /// Spawn a background task to run as part of this instance's event loop.
959    ///
960    /// The task will receive an `&Accessor<U>` and run concurrently with
961    /// any other tasks in progress for the instance.
962    ///
963    /// Note that the task will only make progress if and when the event loop
964    /// for this instance is run.
965    ///
966    /// The returned [`SpawnHandle`] may be used to cancel the task.
967    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
968    where
969        T: 'static,
970    {
971        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
972        self.spawn_with_accessor(accessor, task)
973    }
974
975    /// Internal implementation of `spawn` functions where a `store` is
976    /// available along with an `Accessor`.
977    fn spawn_with_accessor<D>(
978        self,
979        accessor: Accessor<T, D>,
980        task: impl AccessorTask<T, D>,
981    ) -> JoinHandle
982    where
983        T: 'static,
984        D: HasData + ?Sized,
985    {
986        // Create an "abortable future" here where internally the future will
987        // hook calls to poll and possibly spawn more background tasks on each
988        // iteration.
989        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
990        self.0
991            .concurrent_state_mut()
992            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
993        handle
994    }
995
996    /// Run the specified closure `fun` to completion as part of this store's
997    /// event loop.
998    ///
999    /// This will run `fun` as part of this store's event loop until it
1000    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1001    /// controlled access to the store and its data.
1002    ///
1003    /// This function can be used to invoke [`Func::call_concurrent`] for
1004    /// example within the async closure provided here.
1005    ///
1006    /// # Store-blocking behavior
1007    ///
1008    ///
1009    ///
1010    /// At this time there are certain situations in which the `Future` returned
1011    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1012    /// extended period of time, despite one or more `Waker::wake` events having
1013    /// occurred for the task to which it belongs.  This can manifest as the
1014    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1015    /// the `Store` being held by e.g. a blocking host function, preventing the
1016    /// `Future` from being polled. A canonical example of this is when the
1017    /// `fun` provided to this function attempts to set a timeout for an
1018    /// invocation of a wasm function. In this situation the async closure is
1019    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1020    /// to elapse. At this time this setup will not always work and the timeout
1021    /// may not reliably fire.
1022    ///
1023    /// This function will not block the current thread and as such is always
1024    /// suitable to run in an `async` context, but the current implementation of
1025    /// Wasmtime can lead to situations where a certain wasm computation is
1026    /// required to make progress the closure to make progress. This is an
1027    /// artifact of Wasmtime's historical implementation of `async` functions
1028    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1029    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1030    /// progress for a readiness notification of (b) to get delivered.
1031    ///
1032    /// This effectively means that it's not possible to reliably perform a
1033    /// "select" operation within the `fun` closure, which timeouts for example
1034    /// are based on. Fixing this requires some relatively major refactoring
1035    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1036    /// that is intended to be fixed one day. In the meantime it's recommended
1037    /// to apply timeouts or such to the entire `run_concurrent` call itself
1038    /// rather than internally.
1039    ///
1040    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1041    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1042    ///
1043    /// # Example
1044    ///
1045    /// ```
1046    /// # use {
1047    /// #   anyhow::{Result},
1048    /// #   wasmtime::{
1049    /// #     component::{ Component, Linker, Resource, ResourceTable},
1050    /// #     Config, Engine, Store
1051    /// #   },
1052    /// # };
1053    /// #
1054    /// # struct MyResource(u32);
1055    /// # struct Ctx { table: ResourceTable }
1056    /// #
1057    /// # async fn foo() -> Result<()> {
1058    /// # let mut config = Config::new();
1059    /// # let engine = Engine::new(&config)?;
1060    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1061    /// # let mut linker = Linker::new(&engine);
1062    /// # let component = Component::new(&engine, "")?;
1063    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1064    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1065    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1066    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1067    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1068    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1069    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1070    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1071    ///    Ok(())
1072    /// }).await??;
1073    /// # Ok(())
1074    /// # }
1075    /// ```
1076    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1077    where
1078        T: Send + 'static,
1079    {
1080        self.do_run_concurrent(fun, false).await
1081    }
1082
1083    pub(super) async fn run_concurrent_trap_on_idle<R>(
1084        self,
1085        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1086    ) -> Result<R>
1087    where
1088        T: Send + 'static,
1089    {
1090        self.do_run_concurrent(fun, true).await
1091    }
1092
1093    async fn do_run_concurrent<R>(
1094        mut self,
1095        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1096        trap_on_idle: bool,
1097    ) -> Result<R>
1098    where
1099        T: Send + 'static,
1100    {
1101        check_recursive_run();
1102        let token = StoreToken::new(self.as_context_mut());
1103
1104        struct Dropper<'a, T: 'static, V> {
1105            store: StoreContextMut<'a, T>,
1106            value: ManuallyDrop<V>,
1107        }
1108
1109        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1110            fn drop(&mut self) {
1111                tls::set(self.store.0, || {
1112                    // SAFETY: Here we drop the value without moving it for the
1113                    // first and only time -- per the contract for `Drop::drop`,
1114                    // this code won't run again, and the `value` field will no
1115                    // longer be accessible.
1116                    unsafe { ManuallyDrop::drop(&mut self.value) }
1117                });
1118            }
1119        }
1120
1121        let accessor = &Accessor::new(token);
1122        let dropper = &mut Dropper {
1123            store: self,
1124            value: ManuallyDrop::new(fun(accessor)),
1125        };
1126        // SAFETY: We never move `dropper` nor its `value` field.
1127        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1128
1129        dropper
1130            .store
1131            .as_context_mut()
1132            .poll_until(future, trap_on_idle)
1133            .await
1134    }
1135
1136    /// Run this store's event loop.
1137    ///
1138    /// The returned future will resolve when the specified future completes or,
1139    /// if `trap_on_idle` is true, when the event loop can't make further
1140    /// progress.
1141    async fn poll_until<R>(
1142        mut self,
1143        mut future: Pin<&mut impl Future<Output = R>>,
1144        trap_on_idle: bool,
1145    ) -> Result<R>
1146    where
1147        T: Send + 'static,
1148    {
1149        struct Reset<'a, T: 'static> {
1150            store: StoreContextMut<'a, T>,
1151            futures: Option<FuturesUnordered<HostTaskFuture>>,
1152        }
1153
1154        impl<'a, T> Drop for Reset<'a, T> {
1155            fn drop(&mut self) {
1156                if let Some(futures) = self.futures.take() {
1157                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1158                }
1159            }
1160        }
1161
1162        loop {
1163            // Take `ConcurrentState::futures` out of the store so we can poll
1164            // it while also safely giving any of the futures inside access to
1165            // `self`.
1166            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1167            let mut reset = Reset {
1168                store: self.as_context_mut(),
1169                futures,
1170            };
1171            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1172
1173            let result = future::poll_fn(|cx| {
1174                // First, poll the future we were passed as an argument and
1175                // return immediately if it's ready.
1176                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1177                    return Poll::Ready(Ok(Either::Left(value)));
1178                }
1179
1180                // Next, poll `ConcurrentState::futures` (which includes any
1181                // pending host tasks and/or background tasks), returning
1182                // immediately if one of them fails.
1183                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1184                    Poll::Ready(Some(output)) => {
1185                        match output {
1186                            Err(e) => return Poll::Ready(Err(e)),
1187                            Ok(()) => {}
1188                        }
1189                        Poll::Ready(true)
1190                    }
1191                    Poll::Ready(None) => Poll::Ready(false),
1192                    Poll::Pending => Poll::Pending,
1193                };
1194
1195                // Next, check the "high priority" work queue and return
1196                // immediately if it has at least one item.
1197                let state = reset.store.0.concurrent_state_mut();
1198                let ready = mem::take(&mut state.high_priority);
1199                let ready = if ready.is_empty() {
1200                    // Next, check the "low priority" work queue and return
1201                    // immediately if it has at least one item.
1202                    let ready = mem::take(&mut state.low_priority);
1203                    if ready.is_empty() {
1204                        return match next {
1205                            Poll::Ready(true) => {
1206                                // In this case, one of the futures in
1207                                // `ConcurrentState::futures` completed
1208                                // successfully, so we return now and continue
1209                                // the outer loop in case there is another one
1210                                // ready to complete.
1211                                Poll::Ready(Ok(Either::Right(Vec::new())))
1212                            }
1213                            Poll::Ready(false) => {
1214                                // Poll the future we were passed one last time
1215                                // in case one of `ConcurrentState::futures` had
1216                                // the side effect of unblocking it.
1217                                if let Poll::Ready(value) =
1218                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1219                                {
1220                                    Poll::Ready(Ok(Either::Left(value)))
1221                                } else {
1222                                    // In this case, there are no more pending
1223                                    // futures in `ConcurrentState::futures`,
1224                                    // there are no remaining work items, _and_
1225                                    // the future we were passed as an argument
1226                                    // still hasn't completed.
1227                                    if trap_on_idle {
1228                                        // `trap_on_idle` is true, so we exit
1229                                        // immediately.
1230                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1231                                    } else {
1232                                        // `trap_on_idle` is false, so we assume
1233                                        // that future will wake up and give us
1234                                        // more work to do when it's ready to.
1235                                        Poll::Pending
1236                                    }
1237                                }
1238                            }
1239                            // There is at least one pending future in
1240                            // `ConcurrentState::futures` and we have nothing
1241                            // else to do but wait for now, so we return
1242                            // `Pending`.
1243                            Poll::Pending => Poll::Pending,
1244                        };
1245                    } else {
1246                        ready
1247                    }
1248                } else {
1249                    ready
1250                };
1251
1252                Poll::Ready(Ok(Either::Right(ready)))
1253            })
1254            .await;
1255
1256            // Put the `ConcurrentState::futures` back into the store before we
1257            // return or handle any work items since one or more of those items
1258            // might append more futures.
1259            drop(reset);
1260
1261            match result? {
1262                // The future we were passed as an argument completed, so we
1263                // return the result.
1264                Either::Left(value) => break Ok(value),
1265                // The future we were passed has not yet completed, so handle
1266                // any work items and then loop again.
1267                Either::Right(ready) => {
1268                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1269                        store: StoreContextMut<'a, T>,
1270                        ready: I,
1271                    }
1272
1273                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1274                        fn drop(&mut self) {
1275                            while let Some(item) = self.ready.next() {
1276                                match item {
1277                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1278                                    WorkItem::PushFuture(future) => {
1279                                        tls::set(self.store.0, move || drop(future))
1280                                    }
1281                                    _ => {}
1282                                }
1283                            }
1284                        }
1285                    }
1286
1287                    let mut dispose = Dispose {
1288                        store: self.as_context_mut(),
1289                        ready: ready.into_iter(),
1290                    };
1291
1292                    while let Some(item) = dispose.ready.next() {
1293                        dispose
1294                            .store
1295                            .as_context_mut()
1296                            .handle_work_item(item)
1297                            .await?;
1298                    }
1299                }
1300            }
1301        }
1302    }
1303
1304    /// Handle the specified work item, possibly resuming a fiber if applicable.
1305    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1306    where
1307        T: Send,
1308    {
1309        log::trace!("handle work item {item:?}");
1310        match item {
1311            WorkItem::PushFuture(future) => {
1312                self.0
1313                    .concurrent_state_mut()
1314                    .futures
1315                    .get_mut()
1316                    .as_mut()
1317                    .unwrap()
1318                    .push(future.into_inner());
1319            }
1320            WorkItem::ResumeFiber(fiber) => {
1321                self.0.resume_fiber(fiber).await?;
1322            }
1323            WorkItem::GuestCall(call) => {
1324                if call.is_ready(self.0)? {
1325                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1326                } else {
1327                    let state = self.0.concurrent_state_mut();
1328                    let task = state.get_mut(call.thread.task)?;
1329                    if !task.starting_sent {
1330                        task.starting_sent = true;
1331                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1332                            Waitable::Guest(call.thread.task).set_event(
1333                                state,
1334                                Some(Event::Subtask {
1335                                    status: Status::Starting,
1336                                }),
1337                            )?;
1338                        }
1339                    }
1340
1341                    let instance = state.get_mut(call.thread.task)?.instance;
1342                    self.0
1343                        .instance_state(instance)
1344                        .pending
1345                        .insert(call.thread, call.kind);
1346                }
1347            }
1348            WorkItem::Poll(params) => {
1349                let state = self.0.concurrent_state_mut();
1350                if state.get_mut(params.thread.task)?.event.is_some()
1351                    || !state.get_mut(params.set)?.ready.is_empty()
1352                {
1353                    // There's at least one event immediately available; deliver
1354                    // it to the guest ASAP.
1355                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1356                        thread: params.thread,
1357                        kind: GuestCallKind::DeliverEvent {
1358                            instance: params.instance,
1359                            set: Some(params.set),
1360                        },
1361                    }));
1362                } else {
1363                    // There are no events immediately available; deliver
1364                    // `Event::None` to the guest.
1365                    state.get_mut(params.thread.task)?.event = Some(Event::None);
1366                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1367                        thread: params.thread,
1368                        kind: GuestCallKind::DeliverEvent {
1369                            instance: params.instance,
1370                            set: Some(params.set),
1371                        },
1372                    }));
1373                }
1374            }
1375            WorkItem::WorkerFunction(fun) => {
1376                self.run_on_worker(WorkerItem::Function(fun)).await?;
1377            }
1378        }
1379
1380        Ok(())
1381    }
1382
1383    /// Execute the specified guest call on a worker fiber.
1384    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1385    where
1386        T: Send,
1387    {
1388        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1389            fiber
1390        } else {
1391            fiber::make_fiber(self.0, move |store| {
1392                loop {
1393                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1394                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1395                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1396                    }
1397
1398                    store.suspend(SuspendReason::NeedWork)?;
1399                }
1400            })?
1401        };
1402
1403        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1404        assert!(worker_item.is_none());
1405        *worker_item = Some(item);
1406
1407        self.0.resume_fiber(worker).await
1408    }
1409
1410    /// Wrap the specified host function in a future which will call it, passing
1411    /// it an `&Accessor<T>`.
1412    ///
1413    /// See the `Accessor` documentation for details.
1414    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1415    where
1416        T: 'static,
1417        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1418            + Send
1419            + Sync
1420            + 'static,
1421        R: Send + Sync + 'static,
1422    {
1423        let token = StoreToken::new(self);
1424        async move {
1425            let mut accessor = Accessor::new(token);
1426            closure(&mut accessor).await
1427        }
1428    }
1429}
1430
1431#[derive(Debug, Copy, Clone)]
1432pub struct RuntimeInstance {
1433    pub instance: ComponentInstanceId,
1434    pub index: RuntimeComponentInstanceIndex,
1435}
1436
1437impl StoreOpaque {
1438    /// Helper function to retrieve the `ConcurrentInstanceState` for the
1439    /// specified instance.
1440    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1441        StoreComponentInstanceId::new(self.id(), instance.instance)
1442            .get_mut(self)
1443            .instance_state(instance.index)
1444            .unwrap()
1445            .concurrent_state()
1446    }
1447
1448    /// Helper function to retrieve the `HandleTable` for the specified
1449    /// instance.
1450    fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1451        StoreComponentInstanceId::new(self.id(), instance.instance)
1452            .get_mut(self)
1453            .instance_state(instance.index)
1454            .unwrap()
1455            .handle_table()
1456    }
1457
1458    /// Record that we're about to enter a (sub-)component instance which does
1459    /// not support more than one concurrent, stackful activation, meaning it
1460    /// cannot be entered again until the next call returns.
1461    fn enter_instance(&mut self, instance: RuntimeInstance) {
1462        log::trace!("enter {instance:?}");
1463        self.instance_state(instance).do_not_enter = true;
1464    }
1465
1466    /// Record that we've exited a (sub-)component instance previously entered
1467    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1468    /// See the documentation for the latter for details.
1469    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1470        log::trace!("exit {instance:?}");
1471        self.instance_state(instance).do_not_enter = false;
1472        self.partition_pending(instance)
1473    }
1474
1475    /// Iterate over `InstanceState::pending`, moving any ready items into the
1476    /// "high priority" work item queue.
1477    ///
1478    /// See `GuestCall::is_ready` for details.
1479    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1480        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1481            let call = GuestCall { thread, kind };
1482            if call.is_ready(self)? {
1483                self.concurrent_state_mut()
1484                    .push_high_priority(WorkItem::GuestCall(call));
1485            } else {
1486                self.instance_state(instance)
1487                    .pending
1488                    .insert(call.thread, call.kind);
1489            }
1490        }
1491
1492        Ok(())
1493    }
1494
1495    /// Implements the `backpressure.{inc,dec}` intrinsics.
1496    pub(crate) fn backpressure_modify(
1497        &mut self,
1498        caller_instance: RuntimeInstance,
1499        modify: impl FnOnce(u16) -> Option<u16>,
1500    ) -> Result<()> {
1501        let state = self.instance_state(caller_instance);
1502        let old = state.backpressure;
1503        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
1504        state.backpressure = new;
1505
1506        if old > 0 && new == 0 {
1507            // Backpressure was previously enabled and is now disabled; move any
1508            // newly-eligible guest calls to the "high priority" queue.
1509            self.partition_pending(caller_instance)?;
1510        }
1511
1512        Ok(())
1513    }
1514
1515    /// Resume the specified fiber, giving it exclusive access to the specified
1516    /// store.
1517    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1518        let old_thread = self.concurrent_state_mut().guest_thread;
1519        log::trace!("resume_fiber: save current thread {old_thread:?}");
1520
1521        let fiber = fiber::resolve_or_release(self, fiber).await?;
1522
1523        let state = self.concurrent_state_mut();
1524
1525        state.guest_thread = old_thread;
1526        if let Some(ref ot) = old_thread {
1527            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1528        }
1529        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1530
1531        if let Some(mut fiber) = fiber {
1532            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1533            // See the `SuspendReason` documentation for what each case means.
1534            match state.suspend_reason.take().unwrap() {
1535                SuspendReason::NeedWork => {
1536                    if state.worker.is_none() {
1537                        state.worker = Some(fiber);
1538                    } else {
1539                        fiber.dispose(self);
1540                    }
1541                }
1542                SuspendReason::Yielding { thread, .. } => {
1543                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1544                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1545                }
1546                SuspendReason::ExplicitlySuspending { thread, .. } => {
1547                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1548                }
1549                SuspendReason::Waiting { set, thread, .. } => {
1550                    let old = state
1551                        .get_mut(set)?
1552                        .waiting
1553                        .insert(thread, WaitMode::Fiber(fiber));
1554                    assert!(old.is_none());
1555                }
1556            };
1557        } else {
1558            log::trace!("resume_fiber: fiber has exited");
1559        }
1560
1561        Ok(())
1562    }
1563
1564    /// Suspend the current fiber, storing the reason in
1565    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1566    /// it should be resumed.
1567    ///
1568    /// See the `SuspendReason` documentation for details.
1569    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1570        log::trace!("suspend fiber: {reason:?}");
1571
1572        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1573        // pop the call context which manages resource borrows before suspending
1574        // and then push it again once we've resumed.
1575        let task = match &reason {
1576            SuspendReason::Yielding { thread, .. }
1577            | SuspendReason::Waiting { thread, .. }
1578            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1579            SuspendReason::NeedWork => None,
1580        };
1581
1582        let old_guest_thread = if let Some(task) = task {
1583            self.maybe_pop_call_context(task)?;
1584            self.concurrent_state_mut().guest_thread
1585        } else {
1586            None
1587        };
1588
1589        // We should not have reached here unless either there's no current
1590        // task, or the current task is permitted to block.  In addition, we
1591        // special-case `thread.switch-to` and waiting for a subtask to go from
1592        // `starting` to `started`, both of which we consider non-blocking
1593        // operations despite requiring a suspend.
1594        assert!(
1595            matches!(
1596                reason,
1597                SuspendReason::ExplicitlySuspending {
1598                    skip_may_block_check: true,
1599                    ..
1600                } | SuspendReason::Waiting {
1601                    skip_may_block_check: true,
1602                    ..
1603                }
1604            ) || old_guest_thread
1605                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1606                .unwrap_or(true)
1607        );
1608
1609        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1610        assert!(suspend_reason.is_none());
1611        *suspend_reason = Some(reason);
1612
1613        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1614
1615        if let Some(task) = task {
1616            self.concurrent_state_mut().guest_thread = old_guest_thread;
1617            self.maybe_push_call_context(task)?;
1618        }
1619
1620        Ok(())
1621    }
1622
1623    /// Push the call context for managing resource borrows for the specified
1624    /// guest task if it has not yet either returned a result or cancelled
1625    /// itself.
1626    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1627        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1628
1629        if !task.returned_or_cancelled() {
1630            log::trace!("push call context for {guest_task:?}");
1631            let call_context = task.call_context.take().unwrap();
1632            self.component_resource_state().0.push(call_context);
1633        }
1634        Ok(())
1635    }
1636
1637    /// Pop the call context for managing resource borrows for the specified
1638    /// guest task if it has not yet either returned a result or cancelled
1639    /// itself.
1640    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1641        if !self
1642            .concurrent_state_mut()
1643            .get_mut(guest_task)?
1644            .returned_or_cancelled()
1645        {
1646            log::trace!("pop call context for {guest_task:?}");
1647            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1648            self.concurrent_state_mut()
1649                .get_mut(guest_task)?
1650                .call_context = call_context;
1651        }
1652        Ok(())
1653    }
1654
1655    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1656        let state = self.concurrent_state_mut();
1657        let caller = state.guest_thread.unwrap();
1658        let old_set = waitable.common(state)?.set;
1659        let set = state.get_mut(caller.task)?.sync_call_set;
1660        waitable.join(state, Some(set))?;
1661        self.suspend(SuspendReason::Waiting {
1662            set,
1663            thread: caller,
1664            skip_may_block_check: false,
1665        })?;
1666        let state = self.concurrent_state_mut();
1667        waitable.join(state, old_set)
1668    }
1669}
1670
1671impl Instance {
1672    /// Get the next pending event for the specified task and (optional)
1673    /// waitable set, along with the waitable handle if applicable.
1674    fn get_event(
1675        self,
1676        store: &mut StoreOpaque,
1677        guest_task: TableId<GuestTask>,
1678        set: Option<TableId<WaitableSet>>,
1679        cancellable: bool,
1680    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1681        let state = store.concurrent_state_mut();
1682
1683        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1684            log::trace!("deliver event {event:?} to {guest_task:?}");
1685
1686            if cancellable || !matches!(event, Event::Cancelled) {
1687                return Ok(Some((event, None)));
1688            } else {
1689                state.get_mut(guest_task)?.event = Some(event);
1690            }
1691        }
1692
1693        Ok(
1694            if let Some((set, waitable)) = set
1695                .and_then(|set| {
1696                    state
1697                        .get_mut(set)
1698                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1699                        .transpose()
1700                })
1701                .transpose()?
1702            {
1703                let common = waitable.common(state)?;
1704                let handle = common.handle.unwrap();
1705                let event = common.event.take().unwrap();
1706
1707                log::trace!(
1708                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1709                );
1710
1711                waitable.on_delivery(store, self, event);
1712
1713                Some((event, Some((waitable, handle))))
1714            } else {
1715                None
1716            },
1717        )
1718    }
1719
1720    /// Handle the `CallbackCode` returned from an async-lifted export or its
1721    /// callback.
1722    ///
1723    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1724    /// using `handle_guest_call`.
1725    fn handle_callback_code(
1726        self,
1727        store: &mut StoreOpaque,
1728        guest_thread: QualifiedThreadId,
1729        runtime_instance: RuntimeComponentInstanceIndex,
1730        code: u32,
1731    ) -> Result<Option<GuestCall>> {
1732        let (code, set) = unpack_callback_code(code);
1733
1734        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1735
1736        let state = store.concurrent_state_mut();
1737
1738        let get_set = |store: &mut StoreOpaque, handle| {
1739            if handle == 0 {
1740                bail!("invalid waitable-set handle");
1741            }
1742
1743            let set = store
1744                .handle_table(RuntimeInstance {
1745                    instance: self.id().instance(),
1746                    index: runtime_instance,
1747                })
1748                .waitable_set_rep(handle)?;
1749
1750            Ok(TableId::<WaitableSet>::new(set))
1751        };
1752
1753        Ok(match code {
1754            callback_code::EXIT => {
1755                log::trace!("implicit thread {guest_thread:?} completed");
1756                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1757                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1758                if task.threads.is_empty() && !task.returned_or_cancelled() {
1759                    bail!(Trap::NoAsyncResult);
1760                }
1761                match &task.caller {
1762                    Caller::Host { .. } => {
1763                        if task.ready_to_delete() {
1764                            Waitable::Guest(guest_thread.task)
1765                                .delete_from(store.concurrent_state_mut())?;
1766                        }
1767                    }
1768                    Caller::Guest { .. } => {
1769                        task.exited = true;
1770                        task.callback = None;
1771                    }
1772                }
1773                None
1774            }
1775            callback_code::YIELD => {
1776                let task = state.get_mut(guest_thread.task)?;
1777                assert!(task.event.is_none());
1778                task.event = Some(Event::None);
1779                let call = GuestCall {
1780                    thread: guest_thread,
1781                    kind: GuestCallKind::DeliverEvent {
1782                        instance: self,
1783                        set: None,
1784                    },
1785                };
1786                if state.may_block(guest_thread.task) {
1787                    // Push this thread onto the "low priority" queue so it runs
1788                    // after any other threads have had a chance to run.
1789                    state.push_low_priority(WorkItem::GuestCall(call));
1790                    None
1791                } else {
1792                    // Yielding in a non-blocking context is defined as a no-op
1793                    // according to the spec, so we must run this thread
1794                    // immediately without allowing any others to run.
1795                    Some(call)
1796                }
1797            }
1798            callback_code::WAIT | callback_code::POLL => {
1799                // The task may only return `WAIT` or `POLL` if it was created
1800                // for a call to an async export).  Otherwise, we'll trap.
1801                state.check_blocking_for(guest_thread.task)?;
1802
1803                let set = get_set(store, set)?;
1804                let state = store.concurrent_state_mut();
1805
1806                if state.get_mut(guest_thread.task)?.event.is_some()
1807                    || !state.get_mut(set)?.ready.is_empty()
1808                {
1809                    // An event is immediately available; deliver it ASAP.
1810                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1811                        thread: guest_thread,
1812                        kind: GuestCallKind::DeliverEvent {
1813                            instance: self,
1814                            set: Some(set),
1815                        },
1816                    }));
1817                } else {
1818                    // No event is immediately available.
1819                    match code {
1820                        callback_code::POLL => {
1821                            // We're polling, so just yield and check whether an
1822                            // event has arrived after that.
1823                            state.push_low_priority(WorkItem::Poll(PollParams {
1824                                instance: self,
1825                                thread: guest_thread,
1826                                set,
1827                            }));
1828                        }
1829                        callback_code::WAIT => {
1830                            // We're waiting, so register to be woken up when an
1831                            // event is published for this waitable set.
1832                            //
1833                            // Here we also set `GuestTask::wake_on_cancel`
1834                            // which allows `subtask.cancel` to interrupt the
1835                            // wait.
1836                            let old = state
1837                                .get_mut(guest_thread.thread)?
1838                                .wake_on_cancel
1839                                .replace(set);
1840                            assert!(old.is_none());
1841                            let old = state
1842                                .get_mut(set)?
1843                                .waiting
1844                                .insert(guest_thread, WaitMode::Callback(self));
1845                            assert!(old.is_none());
1846                        }
1847                        _ => unreachable!(),
1848                    }
1849                }
1850                None
1851            }
1852            _ => bail!("unsupported callback code: {code}"),
1853        })
1854    }
1855
1856    fn cleanup_thread(
1857        self,
1858        store: &mut StoreOpaque,
1859        guest_thread: QualifiedThreadId,
1860        runtime_instance: RuntimeComponentInstanceIndex,
1861    ) -> Result<()> {
1862        let guest_id = store
1863            .concurrent_state_mut()
1864            .get_mut(guest_thread.thread)?
1865            .instance_rep;
1866        store
1867            .handle_table(RuntimeInstance {
1868                instance: self.id().instance(),
1869                index: runtime_instance,
1870            })
1871            .guest_thread_remove(guest_id.unwrap())?;
1872
1873        store.concurrent_state_mut().delete(guest_thread.thread)?;
1874        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1875        task.threads.remove(&guest_thread.thread);
1876        Ok(())
1877    }
1878
1879    /// Add the specified guest call to the "high priority" work item queue, to
1880    /// be started as soon as backpressure and/or reentrance rules allow.
1881    ///
1882    /// SAFETY: The raw pointer arguments must be valid references to guest
1883    /// functions (with the appropriate signatures) when the closures queued by
1884    /// this function are called.
1885    unsafe fn queue_call<T: 'static>(
1886        self,
1887        mut store: StoreContextMut<T>,
1888        guest_thread: QualifiedThreadId,
1889        callee: SendSyncPtr<VMFuncRef>,
1890        param_count: usize,
1891        result_count: usize,
1892        flags: Option<InstanceFlags>,
1893        async_: bool,
1894        callback: Option<SendSyncPtr<VMFuncRef>>,
1895        post_return: Option<SendSyncPtr<VMFuncRef>>,
1896    ) -> Result<()> {
1897        /// Return a closure which will call the specified function in the scope
1898        /// of the specified task.
1899        ///
1900        /// This will use `GuestTask::lower_params` to lower the parameters, but
1901        /// will not lift the result; instead, it returns a
1902        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1903        /// any, may be lifted.  Note that an async-lifted export will have
1904        /// returned its result using the `task.return` intrinsic (or not
1905        /// returned a result at all, in the case of `task.cancel`), in which
1906        /// case the "result" of this call will either be a callback code or
1907        /// nothing.
1908        ///
1909        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1910        /// the returned closure is called.
1911        unsafe fn make_call<T: 'static>(
1912            store: StoreContextMut<T>,
1913            guest_thread: QualifiedThreadId,
1914            callee: SendSyncPtr<VMFuncRef>,
1915            param_count: usize,
1916            result_count: usize,
1917            flags: Option<InstanceFlags>,
1918        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1919        + Send
1920        + Sync
1921        + 'static
1922        + use<T> {
1923            let token = StoreToken::new(store);
1924            move |store: &mut dyn VMStore| {
1925                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1926
1927                store
1928                    .concurrent_state_mut()
1929                    .get_mut(guest_thread.thread)?
1930                    .state = GuestThreadState::Running;
1931                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1932                let may_enter_after_call = task.call_post_return_automatically();
1933                let lower = task.lower_params.take().unwrap();
1934
1935                lower(store, &mut storage[..param_count])?;
1936
1937                let mut store = token.as_context_mut(store);
1938
1939                // SAFETY: Per the contract documented in `make_call's`
1940                // documentation, `callee` must be a valid pointer.
1941                unsafe {
1942                    if let Some(mut flags) = flags {
1943                        flags.set_may_enter(false);
1944                    }
1945                    crate::Func::call_unchecked_raw(
1946                        &mut store,
1947                        callee.as_non_null(),
1948                        NonNull::new(
1949                            &mut storage[..param_count.max(result_count)]
1950                                as *mut [MaybeUninit<ValRaw>] as _,
1951                        )
1952                        .unwrap(),
1953                    )?;
1954                    if let Some(mut flags) = flags {
1955                        flags.set_may_enter(may_enter_after_call);
1956                    }
1957                }
1958
1959                Ok(storage)
1960            }
1961        }
1962
1963        // SAFETY: Per the contract described in this function documentation,
1964        // the `callee` pointer which `call` closes over must be valid when
1965        // called by the closure we queue below.
1966        let call = unsafe {
1967            make_call(
1968                store.as_context_mut(),
1969                guest_thread,
1970                callee,
1971                param_count,
1972                result_count,
1973                flags,
1974            )
1975        };
1976
1977        let callee_instance = store
1978            .0
1979            .concurrent_state_mut()
1980            .get_mut(guest_thread.task)?
1981            .instance;
1982
1983        let fun = if callback.is_some() {
1984            assert!(async_);
1985
1986            Box::new(move |store: &mut dyn VMStore| {
1987                self.add_guest_thread_to_instance_table(
1988                    guest_thread.thread,
1989                    store,
1990                    callee_instance.index,
1991                )?;
1992                let old_thread = store
1993                    .concurrent_state_mut()
1994                    .guest_thread
1995                    .replace(guest_thread);
1996                log::trace!(
1997                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1998                );
1999
2000                store.maybe_push_call_context(guest_thread.task)?;
2001
2002                store.enter_instance(callee_instance);
2003
2004                // SAFETY: See the documentation for `make_call` to review the
2005                // contract we must uphold for `call` here.
2006                //
2007                // Per the contract described in the `queue_call`
2008                // documentation, the `callee` pointer which `call` closes
2009                // over must be valid.
2010                let storage = call(store)?;
2011
2012                store.exit_instance(callee_instance)?;
2013
2014                store.maybe_pop_call_context(guest_thread.task)?;
2015
2016                let state = store.concurrent_state_mut();
2017                state.guest_thread = old_thread;
2018                old_thread
2019                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2020                log::trace!("stackless call: restored {old_thread:?} as current thread");
2021
2022                // SAFETY: `wasmparser` will have validated that the callback
2023                // function returns a `i32` result.
2024                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2025
2026                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2027            })
2028                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2029        } else {
2030            let token = StoreToken::new(store.as_context_mut());
2031            Box::new(move |store: &mut dyn VMStore| {
2032                self.add_guest_thread_to_instance_table(
2033                    guest_thread.thread,
2034                    store,
2035                    callee_instance.index,
2036                )?;
2037                let old_thread = store
2038                    .concurrent_state_mut()
2039                    .guest_thread
2040                    .replace(guest_thread);
2041                log::trace!(
2042                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2043                );
2044                let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2045
2046                store.maybe_push_call_context(guest_thread.task)?;
2047
2048                // Unless this is a callback-less (i.e. stackful)
2049                // async-lifted export, we need to record that the instance
2050                // cannot be entered until the call returns.
2051                if !async_ {
2052                    store.enter_instance(callee_instance);
2053                }
2054
2055                // SAFETY: See the documentation for `make_call` to review the
2056                // contract we must uphold for `call` here.
2057                //
2058                // Per the contract described in the `queue_call`
2059                // documentation, the `callee` pointer which `call` closes
2060                // over must be valid.
2061                let storage = call(store)?;
2062
2063                // This is a callback-less call, so the implicit thread has now completed
2064                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2065
2066                if async_ {
2067                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2068                    if task.threads.is_empty() && !task.returned_or_cancelled() {
2069                        bail!(Trap::NoAsyncResult);
2070                    }
2071                } else {
2072                    // This is a sync-lifted export, so now is when we lift the
2073                    // result, optionally call the post-return function, if any,
2074                    // and finally notify any current or future waiters that the
2075                    // subtask has returned.
2076
2077                    let lift = {
2078                        store.exit_instance(callee_instance)?;
2079
2080                        let state = store.concurrent_state_mut();
2081                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2082
2083                        state
2084                            .get_mut(guest_thread.task)?
2085                            .lift_result
2086                            .take()
2087                            .unwrap()
2088                    };
2089
2090                    // SAFETY: `result_count` represents the number of core Wasm
2091                    // results returned, per `wasmparser`.
2092                    let result = (lift.lift)(store, unsafe {
2093                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2094                            &storage[..result_count],
2095                        )
2096                    })?;
2097
2098                    let post_return_arg = match result_count {
2099                        0 => ValRaw::i32(0),
2100                        // SAFETY: `result_count` represents the number of
2101                        // core Wasm results returned, per `wasmparser`.
2102                        1 => unsafe { storage[0].assume_init() },
2103                        _ => unreachable!(),
2104                    };
2105
2106                    if store
2107                        .concurrent_state_mut()
2108                        .get_mut(guest_thread.task)?
2109                        .call_post_return_automatically()
2110                    {
2111                        unsafe {
2112                            flags.set_may_leave(false);
2113                            flags.set_needs_post_return(false);
2114                        }
2115
2116                        if let Some(func) = post_return {
2117                            let mut store = token.as_context_mut(store);
2118
2119                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
2120                            // either `wasmtime-cranelift`-generated fused adapter
2121                            // code or `component::Options`.  Per `wasmparser`
2122                            // post-return signature validation, we know it takes a
2123                            // single parameter.
2124                            unsafe {
2125                                crate::Func::call_unchecked_raw(
2126                                    &mut store,
2127                                    func.as_non_null(),
2128                                    slice::from_ref(&post_return_arg).into(),
2129                                )?;
2130                            }
2131                        }
2132
2133                        unsafe {
2134                            flags.set_may_leave(true);
2135                            flags.set_may_enter(true);
2136                        }
2137                    }
2138
2139                    self.task_complete(
2140                        store,
2141                        guest_thread.task,
2142                        result,
2143                        Status::Returned,
2144                        post_return_arg,
2145                    )?;
2146                }
2147
2148                store.maybe_pop_call_context(guest_thread.task)?;
2149
2150                let state = store.concurrent_state_mut();
2151                let task = state.get_mut(guest_thread.task)?;
2152
2153                match &task.caller {
2154                    Caller::Host { .. } => {
2155                        if task.ready_to_delete() {
2156                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2157                        }
2158                    }
2159                    Caller::Guest { .. } => {
2160                        task.exited = true;
2161                    }
2162                }
2163
2164                Ok(None)
2165            })
2166        };
2167
2168        store
2169            .0
2170            .concurrent_state_mut()
2171            .push_high_priority(WorkItem::GuestCall(GuestCall {
2172                thread: guest_thread,
2173                kind: GuestCallKind::StartImplicit(fun),
2174            }));
2175
2176        Ok(())
2177    }
2178
2179    /// Prepare (but do not start) a guest->guest call.
2180    ///
2181    /// This is called from fused adapter code generated in
2182    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2183    /// are synthesized Wasm functions which move the parameters from the caller
2184    /// to the callee and the result from the callee to the caller,
2185    /// respectively.  The adapter will call `Self::start_call` immediately
2186    /// after calling this function.
2187    ///
2188    /// SAFETY: All the pointer arguments must be valid pointers to guest
2189    /// entities (and with the expected signatures for the function references
2190    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2191    unsafe fn prepare_call<T: 'static>(
2192        self,
2193        mut store: StoreContextMut<T>,
2194        start: *mut VMFuncRef,
2195        return_: *mut VMFuncRef,
2196        caller_instance: RuntimeComponentInstanceIndex,
2197        callee_instance: RuntimeComponentInstanceIndex,
2198        task_return_type: TypeTupleIndex,
2199        callee_async: bool,
2200        memory: *mut VMMemoryDefinition,
2201        string_encoding: u8,
2202        caller_info: CallerInfo,
2203    ) -> Result<()> {
2204        self.id().get(store.0).check_may_leave(caller_instance)?;
2205
2206        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2207            // A task may only call an async-typed function via a sync lower if
2208            // it was created by a call to an async export.  Otherwise, we'll
2209            // trap.
2210            store.0.concurrent_state_mut().check_blocking()?;
2211        }
2212
2213        enum ResultInfo {
2214            Heap { results: u32 },
2215            Stack { result_count: u32 },
2216        }
2217
2218        let result_info = match &caller_info {
2219            CallerInfo::Async {
2220                has_result: true,
2221                params,
2222            } => ResultInfo::Heap {
2223                results: params.last().unwrap().get_u32(),
2224            },
2225            CallerInfo::Async {
2226                has_result: false, ..
2227            } => ResultInfo::Stack { result_count: 0 },
2228            CallerInfo::Sync {
2229                result_count,
2230                params,
2231            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2232                results: params.last().unwrap().get_u32(),
2233            },
2234            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2235                result_count: *result_count,
2236            },
2237        };
2238
2239        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2240
2241        // Create a new guest task for the call, closing over the `start` and
2242        // `return_` functions to lift the parameters and lower the result,
2243        // respectively.
2244        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2245        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2246        let token = StoreToken::new(store.as_context_mut());
2247        let state = store.0.concurrent_state_mut();
2248        let old_thread = state.guest_thread.take();
2249        let new_task = GuestTask::new(
2250            state,
2251            Box::new(move |store, dst| {
2252                let mut store = token.as_context_mut(store);
2253                assert!(dst.len() <= MAX_FLAT_PARAMS);
2254                // The `+ 1` here accounts for the return pointer, if any:
2255                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2256                let count = match caller_info {
2257                    // Async callers, if they have a result, use the last
2258                    // parameter as a return pointer so chop that off if
2259                    // relevant here.
2260                    CallerInfo::Async { params, has_result } => {
2261                        let params = &params[..params.len() - usize::from(has_result)];
2262                        for (param, src) in params.iter().zip(&mut src) {
2263                            src.write(*param);
2264                        }
2265                        params.len()
2266                    }
2267
2268                    // Sync callers forward everything directly.
2269                    CallerInfo::Sync { params, .. } => {
2270                        for (param, src) in params.iter().zip(&mut src) {
2271                            src.write(*param);
2272                        }
2273                        params.len()
2274                    }
2275                };
2276                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2277                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2278                // how it was constructed (see
2279                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2280                // for details) we know it takes count parameters and returns
2281                // `dst.len()` results.
2282                unsafe {
2283                    crate::Func::call_unchecked_raw(
2284                        &mut store,
2285                        start.as_non_null(),
2286                        NonNull::new(
2287                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2288                        )
2289                        .unwrap(),
2290                    )?;
2291                }
2292                dst.copy_from_slice(&src[..dst.len()]);
2293                let state = store.0.concurrent_state_mut();
2294                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2295                    state,
2296                    Some(Event::Subtask {
2297                        status: Status::Started,
2298                    }),
2299                )?;
2300                Ok(())
2301            }),
2302            LiftResult {
2303                lift: Box::new(move |store, src| {
2304                    // SAFETY: See comment in closure passed as `lower_params`
2305                    // parameter above.
2306                    let mut store = token.as_context_mut(store);
2307                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2308                    if let ResultInfo::Heap { results } = &result_info {
2309                        my_src.push(ValRaw::u32(*results));
2310                    }
2311                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2312                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2313                    // on how it was constructed (see
2314                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2315                    // for details) we know it takes `src.len()` parameters and
2316                    // returns up to 1 result.
2317                    unsafe {
2318                        crate::Func::call_unchecked_raw(
2319                            &mut store,
2320                            return_.as_non_null(),
2321                            my_src.as_mut_slice().into(),
2322                        )?;
2323                    }
2324                    let state = store.0.concurrent_state_mut();
2325                    let thread = state.guest_thread.unwrap();
2326                    if sync_caller {
2327                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2328                            if let ResultInfo::Stack { result_count } = &result_info {
2329                                match result_count {
2330                                    0 => None,
2331                                    1 => Some(my_src[0]),
2332                                    _ => unreachable!(),
2333                                }
2334                            } else {
2335                                None
2336                            },
2337                        );
2338                    }
2339                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2340                }),
2341                ty: task_return_type,
2342                memory: NonNull::new(memory).map(SendSyncPtr::new),
2343                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2344            },
2345            Caller::Guest {
2346                thread: old_thread.unwrap(),
2347                instance: caller_instance,
2348            },
2349            None,
2350            self,
2351            callee_instance,
2352            callee_async,
2353        )?;
2354
2355        let guest_task = state.push(new_task)?;
2356        let new_thread = GuestThread::new_implicit(guest_task);
2357        let guest_thread = state.push(new_thread)?;
2358        state.get_mut(guest_task)?.threads.insert(guest_thread);
2359
2360        let state = store.0.concurrent_state_mut();
2361        if let Some(old_thread) = old_thread {
2362            if !state.may_enter(guest_task) {
2363                bail!(crate::Trap::CannotEnterComponent);
2364            }
2365
2366            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2367        };
2368
2369        // Make the new thread the current one so that `Self::start_call` knows
2370        // which one to start.
2371        state.guest_thread = Some(QualifiedThreadId {
2372            task: guest_task,
2373            thread: guest_thread,
2374        });
2375        log::trace!(
2376            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2377        );
2378
2379        Ok(())
2380    }
2381
2382    /// Call the specified callback function for an async-lifted export.
2383    ///
2384    /// SAFETY: `function` must be a valid reference to a guest function of the
2385    /// correct signature for a callback.
2386    unsafe fn call_callback<T>(
2387        self,
2388        mut store: StoreContextMut<T>,
2389        callee_instance: RuntimeComponentInstanceIndex,
2390        function: SendSyncPtr<VMFuncRef>,
2391        event: Event,
2392        handle: u32,
2393        may_enter_after_call: bool,
2394    ) -> Result<u32> {
2395        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2396
2397        let (ordinal, result) = event.parts();
2398        let params = &mut [
2399            ValRaw::u32(ordinal),
2400            ValRaw::u32(handle),
2401            ValRaw::u32(result),
2402        ];
2403        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2404        // `wasmtime-cranelift`-generated fused adapter code or
2405        // `component::Options`.  Per `wasmparser` callback signature
2406        // validation, we know it takes three parameters and returns one.
2407        unsafe {
2408            flags.set_may_enter(false);
2409            crate::Func::call_unchecked_raw(
2410                &mut store,
2411                function.as_non_null(),
2412                params.as_mut_slice().into(),
2413            )?;
2414            flags.set_may_enter(may_enter_after_call);
2415        }
2416        Ok(params[0].get_u32())
2417    }
2418
2419    /// Start a guest->guest call previously prepared using
2420    /// `Self::prepare_call`.
2421    ///
2422    /// This is called from fused adapter code generated in
2423    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2424    /// this function immediately after calling `Self::prepare_call`.
2425    ///
2426    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2427    /// functions with the appropriate signatures for the current guest task.
2428    /// If this is a call to an async-lowered import, the actual call may be
2429    /// deferred and run after this function returns, in which case the pointer
2430    /// arguments must also be valid when the call happens.
2431    unsafe fn start_call<T: 'static>(
2432        self,
2433        mut store: StoreContextMut<T>,
2434        callback: *mut VMFuncRef,
2435        post_return: *mut VMFuncRef,
2436        callee: *mut VMFuncRef,
2437        param_count: u32,
2438        result_count: u32,
2439        flags: u32,
2440        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2441    ) -> Result<u32> {
2442        let token = StoreToken::new(store.as_context_mut());
2443        let async_caller = storage.is_none();
2444        let state = store.0.concurrent_state_mut();
2445        let guest_thread = state.guest_thread.unwrap();
2446        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2447        let may_enter_after_call = state
2448            .get_mut(guest_thread.task)?
2449            .call_post_return_automatically();
2450        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2451        let param_count = usize::try_from(param_count).unwrap();
2452        assert!(param_count <= MAX_FLAT_PARAMS);
2453        let result_count = usize::try_from(result_count).unwrap();
2454        assert!(result_count <= MAX_FLAT_RESULTS);
2455
2456        let task = state.get_mut(guest_thread.task)?;
2457        if !callback.is_null() {
2458            // We're calling an async-lifted export with a callback, so store
2459            // the callback and related context as part of the task so we can
2460            // call it later when needed.
2461            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2462            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2463                let store = token.as_context_mut(store);
2464                unsafe {
2465                    self.call_callback::<T>(
2466                        store,
2467                        runtime_instance,
2468                        callback,
2469                        event,
2470                        handle,
2471                        may_enter_after_call,
2472                    )
2473                }
2474            }));
2475        }
2476
2477        let Caller::Guest {
2478            thread: caller,
2479            instance: runtime_instance,
2480        } = &task.caller
2481        else {
2482            // As of this writing, `start_call` is only used for guest->guest
2483            // calls.
2484            unreachable!()
2485        };
2486        let caller = *caller;
2487        let caller_instance = *runtime_instance;
2488
2489        let callee_instance = task.instance;
2490
2491        let instance_flags = if callback.is_null() {
2492            None
2493        } else {
2494            Some(self.id().get(store.0).instance_flags(callee_instance.index))
2495        };
2496
2497        // Queue the call as a "high priority" work item.
2498        unsafe {
2499            self.queue_call(
2500                store.as_context_mut(),
2501                guest_thread,
2502                callee,
2503                param_count,
2504                result_count,
2505                instance_flags,
2506                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2507                NonNull::new(callback).map(SendSyncPtr::new),
2508                NonNull::new(post_return).map(SendSyncPtr::new),
2509            )?;
2510        }
2511
2512        let state = store.0.concurrent_state_mut();
2513
2514        // Use the caller's `GuestTask::sync_call_set` to register interest in
2515        // the subtask...
2516        let guest_waitable = Waitable::Guest(guest_thread.task);
2517        let old_set = guest_waitable.common(state)?.set;
2518        let set = state.get_mut(caller.task)?.sync_call_set;
2519        guest_waitable.join(state, Some(set))?;
2520
2521        // ... and suspend this fiber temporarily while we wait for it to start.
2522        //
2523        // Note that we _could_ call the callee directly using the current fiber
2524        // rather than suspend this one, but that would make reasoning about the
2525        // event loop more complicated and is probably only worth doing if
2526        // there's a measurable performance benefit.  In addition, it would mean
2527        // blocking the caller if the callee calls a blocking sync-lowered
2528        // import, and as of this writing the spec says we must not do that.
2529        //
2530        // Alternatively, the fused adapter code could be modified to call the
2531        // callee directly without calling a host-provided intrinsic at all (in
2532        // which case it would need to do its own, inline backpressure checks,
2533        // etc.).  Again, we'd want to see a measurable performance benefit
2534        // before committing to such an optimization.  And again, we'd need to
2535        // update the spec to allow that.
2536        let (status, waitable) = loop {
2537            store.0.suspend(SuspendReason::Waiting {
2538                set,
2539                thread: caller,
2540                // Normally, `StoreOpaque::suspend` would assert it's being
2541                // called from a context where blocking is allowed.  However, if
2542                // `async_caller` is `true`, we'll only "block" long enough for
2543                // the callee to start, i.e. we won't repeat this loop, so we
2544                // tell `suspend` it's okay even if we're not allowed to block.
2545                // Alternatively, if the callee is not an async function, then
2546                // we know it won't block anyway.
2547                skip_may_block_check: async_caller || !callee_async,
2548            })?;
2549
2550            let state = store.0.concurrent_state_mut();
2551
2552            log::trace!("taking event for {:?}", guest_thread.task);
2553            let event = guest_waitable.take_event(state)?;
2554            let Some(Event::Subtask { status }) = event else {
2555                unreachable!();
2556            };
2557
2558            log::trace!("status {status:?} for {:?}", guest_thread.task);
2559
2560            if status == Status::Returned {
2561                // It returned, so we can stop waiting.
2562                break (status, None);
2563            } else if async_caller {
2564                // It hasn't returned yet, but the caller is calling via an
2565                // async-lowered import, so we generate a handle for the task
2566                // waitable and return the status.
2567                let handle = store
2568                    .0
2569                    .handle_table(RuntimeInstance {
2570                        instance: self.id().instance(),
2571                        index: caller_instance,
2572                    })
2573                    .subtask_insert_guest(guest_thread.task.rep())?;
2574                store
2575                    .0
2576                    .concurrent_state_mut()
2577                    .get_mut(guest_thread.task)?
2578                    .common
2579                    .handle = Some(handle);
2580                break (status, Some(handle));
2581            } else {
2582                // The callee hasn't returned yet, and the caller is calling via
2583                // a sync-lowered import, so we loop and keep waiting until the
2584                // callee returns.
2585            }
2586        };
2587
2588        let state = store.0.concurrent_state_mut();
2589
2590        guest_waitable.join(state, old_set)?;
2591
2592        if let Some(storage) = storage {
2593            // The caller used a sync-lowered import to call an async-lifted
2594            // export, in which case the result, if any, has been stashed in
2595            // `GuestTask::sync_result`.
2596            let task = state.get_mut(guest_thread.task)?;
2597            if let Some(result) = task.sync_result.take() {
2598                if let Some(result) = result {
2599                    storage[0] = MaybeUninit::new(result);
2600                }
2601
2602                if task.exited {
2603                    if task.ready_to_delete() {
2604                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2605                    }
2606                }
2607            }
2608        }
2609
2610        // Reset the current thread to point to the caller as it resumes control.
2611        state.guest_thread = Some(caller);
2612        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2613        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2614
2615        Ok(status.pack(waitable))
2616    }
2617
2618    /// Poll the specified future once on behalf of a guest->host call using an
2619    /// async-lowered import.
2620    ///
2621    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2622    /// `Pending`, add it to the set of futures to be polled as part of this
2623    /// instance's event loop until it completes, and then return
2624    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2625    ///
2626    /// Whether the future returns `Ready` immediately or later, the `lower`
2627    /// function will be used to lower the result, if any, into the guest caller's
2628    /// stack and linear memory unless the task has been cancelled.
2629    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2630        self,
2631        mut store: StoreContextMut<'_, T>,
2632        future: impl Future<Output = Result<R>> + Send + 'static,
2633        caller_instance: RuntimeComponentInstanceIndex,
2634        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2635    ) -> Result<Option<u32>> {
2636        let token = StoreToken::new(store.as_context_mut());
2637        let state = store.0.concurrent_state_mut();
2638        let caller = state.guest_thread.unwrap();
2639
2640        // Create an abortable future which hooks calls to poll and manages call
2641        // context state for the future.
2642        let (join_handle, future) = JoinHandle::run(async move {
2643            let mut future = pin!(future);
2644            let mut call_context = None;
2645            future::poll_fn(move |cx| {
2646                // Push the call context for managing any resource borrows
2647                // for the task.
2648                tls::get(|store| {
2649                    if let Some(call_context) = call_context.take() {
2650                        token
2651                            .as_context_mut(store)
2652                            .0
2653                            .component_resource_state()
2654                            .0
2655                            .push(call_context);
2656                    }
2657                });
2658
2659                let result = future.as_mut().poll(cx);
2660
2661                if result.is_pending() {
2662                    // Pop the call context for managing any resource
2663                    // borrows for the task.
2664                    tls::get(|store| {
2665                        call_context = Some(
2666                            token
2667                                .as_context_mut(store)
2668                                .0
2669                                .component_resource_state()
2670                                .0
2671                                .pop()
2672                                .unwrap(),
2673                        );
2674                    });
2675                }
2676                result
2677            })
2678            .await
2679        });
2680
2681        // We create a new host task even though it might complete immediately
2682        // (in which case we won't need to pass a waitable back to the guest).
2683        // If it does complete immediately, we'll remove it before we return.
2684        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2685
2686        log::trace!("new host task child of {caller:?}: {task:?}");
2687
2688        let mut future = Box::pin(future);
2689
2690        // Finally, poll the future.  We can use a dummy `Waker` here because
2691        // we'll add the future to `ConcurrentState::futures` and poll it
2692        // automatically from the event loop if it doesn't complete immediately
2693        // here.
2694        let poll = tls::set(store.0, || {
2695            future
2696                .as_mut()
2697                .poll(&mut Context::from_waker(&Waker::noop()))
2698        });
2699
2700        Ok(match poll {
2701            Poll::Ready(None) => unreachable!(),
2702            Poll::Ready(Some(result)) => {
2703                // It finished immediately; lower the result and delete the
2704                // task.
2705                lower(store.as_context_mut(), result?)?;
2706                log::trace!("delete host task {task:?} (already ready)");
2707                store.0.concurrent_state_mut().delete(task)?;
2708                None
2709            }
2710            Poll::Pending => {
2711                // It hasn't finished yet; add the future to
2712                // `ConcurrentState::futures` so it will be polled by the event
2713                // loop and allocate a waitable handle to return to the guest.
2714
2715                // Wrap the future in a closure responsible for lowering the result into
2716                // the guest's stack and memory, as well as notifying any waiters that
2717                // the task returned.
2718                let future =
2719                    Box::pin(async move {
2720                        let result = match future.await {
2721                            Some(result) => result?,
2722                            // Task was cancelled; nothing left to do.
2723                            None => return Ok(()),
2724                        };
2725                        tls::get(move |store| {
2726                            // Here we schedule a task to run on a worker fiber to do
2727                            // the lowering since it may involve a call to the guest's
2728                            // realloc function.  This is necessary because calling the
2729                            // guest while there are host embedder frames on the stack
2730                            // is unsound.
2731                            store.concurrent_state_mut().push_high_priority(
2732                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2733                                    lower(token.as_context_mut(store), result)?;
2734                                    let state = store.concurrent_state_mut();
2735                                    state.get_mut(task)?.join_handle.take();
2736                                    Waitable::Host(task).set_event(
2737                                        state,
2738                                        Some(Event::Subtask {
2739                                            status: Status::Returned,
2740                                        }),
2741                                    )
2742                                }))),
2743                            );
2744                            Ok(())
2745                        })
2746                    });
2747
2748                store.0.concurrent_state_mut().push_future(future);
2749                let handle = store
2750                    .0
2751                    .handle_table(RuntimeInstance {
2752                        instance: self.id().instance(),
2753                        index: caller_instance,
2754                    })
2755                    .subtask_insert_host(task.rep())?;
2756                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2757                log::trace!(
2758                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2759                );
2760                Some(handle)
2761            }
2762        })
2763    }
2764
2765    /// Implements the `task.return` intrinsic, lifting the result for the
2766    /// current guest task.
2767    pub(crate) fn task_return(
2768        self,
2769        store: &mut dyn VMStore,
2770        caller: RuntimeComponentInstanceIndex,
2771        ty: TypeTupleIndex,
2772        options: OptionsIndex,
2773        storage: &[ValRaw],
2774    ) -> Result<()> {
2775        self.id().get(store).check_may_leave(caller)?;
2776        let state = store.concurrent_state_mut();
2777        let guest_thread = state.guest_thread.unwrap();
2778        let lift = state
2779            .get_mut(guest_thread.task)?
2780            .lift_result
2781            .take()
2782            .ok_or_else(|| {
2783                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2784            })?;
2785        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2786
2787        let CanonicalOptions {
2788            string_encoding,
2789            data_model,
2790            ..
2791        } = &self.id().get(store).component().env_component().options[options];
2792
2793        let invalid = ty != lift.ty
2794            || string_encoding != &lift.string_encoding
2795            || match data_model {
2796                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2797                    Some(memory) => {
2798                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2799                        let actual = self.id().get(store).runtime_memory(memory);
2800                        expected != actual.as_ptr()
2801                    }
2802                    // Memory not specified, meaning it didn't need to be
2803                    // specified per validation, so not invalid.
2804                    None => false,
2805                },
2806                // Always invalid as this isn't supported.
2807                CanonicalOptionsDataModel::Gc { .. } => true,
2808            };
2809
2810        if invalid {
2811            bail!("invalid `task.return` signature and/or options for current task");
2812        }
2813
2814        log::trace!("task.return for {guest_thread:?}");
2815
2816        let result = (lift.lift)(store, storage)?;
2817        self.task_complete(
2818            store,
2819            guest_thread.task,
2820            result,
2821            Status::Returned,
2822            ValRaw::i32(0),
2823        )
2824    }
2825
2826    /// Implements the `task.cancel` intrinsic.
2827    pub(crate) fn task_cancel(
2828        self,
2829        store: &mut StoreOpaque,
2830        caller: RuntimeComponentInstanceIndex,
2831    ) -> Result<()> {
2832        self.id().get(store).check_may_leave(caller)?;
2833        let state = store.concurrent_state_mut();
2834        let guest_thread = state.guest_thread.unwrap();
2835        let task = state.get_mut(guest_thread.task)?;
2836        if !task.cancel_sent {
2837            bail!("`task.cancel` called by task which has not been cancelled")
2838        }
2839        _ = task.lift_result.take().ok_or_else(|| {
2840            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2841        })?;
2842
2843        assert!(task.result.is_none());
2844
2845        log::trace!("task.cancel for {guest_thread:?}");
2846
2847        self.task_complete(
2848            store,
2849            guest_thread.task,
2850            Box::new(DummyResult),
2851            Status::ReturnCancelled,
2852            ValRaw::i32(0),
2853        )
2854    }
2855
2856    /// Complete the specified guest task (i.e. indicate that it has either
2857    /// returned a (possibly empty) result or cancelled itself).
2858    ///
2859    /// This will return any resource borrows and notify any current or future
2860    /// waiters that the task has completed.
2861    fn task_complete(
2862        self,
2863        store: &mut StoreOpaque,
2864        guest_task: TableId<GuestTask>,
2865        result: Box<dyn Any + Send + Sync>,
2866        status: Status,
2867        post_return_arg: ValRaw,
2868    ) -> Result<()> {
2869        if store
2870            .concurrent_state_mut()
2871            .get_mut(guest_task)?
2872            .call_post_return_automatically()
2873        {
2874            let (calls, host_table, _, instance) =
2875                store.component_resource_state_with_instance(self);
2876            ResourceTables {
2877                calls,
2878                host_table: Some(host_table),
2879                guest: Some(instance.instance_states()),
2880            }
2881            .exit_call()?;
2882        } else {
2883            // As of this writing, the only scenario where `call_post_return_automatically`
2884            // would be false for a `GuestTask` is for host-to-guest calls using
2885            // `[Typed]Func::call_async`, in which case the `function_index`
2886            // should be a non-`None` value.
2887            let function_index = store
2888                .concurrent_state_mut()
2889                .get_mut(guest_task)?
2890                .function_index
2891                .unwrap();
2892            self.id()
2893                .get_mut(store)
2894                .post_return_arg_set(function_index, post_return_arg);
2895        }
2896
2897        let state = store.concurrent_state_mut();
2898        let task = state.get_mut(guest_task)?;
2899
2900        if let Caller::Host { tx, .. } = &mut task.caller {
2901            if let Some(tx) = tx.take() {
2902                _ = tx.send(result);
2903            }
2904        } else {
2905            task.result = Some(result);
2906            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2907        }
2908
2909        Ok(())
2910    }
2911
2912    /// Implements the `waitable-set.new` intrinsic.
2913    pub(crate) fn waitable_set_new(
2914        self,
2915        store: &mut StoreOpaque,
2916        caller_instance: RuntimeComponentInstanceIndex,
2917    ) -> Result<u32> {
2918        self.id().get_mut(store).check_may_leave(caller_instance)?;
2919        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2920        let handle = store
2921            .handle_table(RuntimeInstance {
2922                instance: self.id().instance(),
2923                index: caller_instance,
2924            })
2925            .waitable_set_insert(set.rep())?;
2926        log::trace!("new waitable set {set:?} (handle {handle})");
2927        Ok(handle)
2928    }
2929
2930    /// Implements the `waitable-set.drop` intrinsic.
2931    pub(crate) fn waitable_set_drop(
2932        self,
2933        store: &mut StoreOpaque,
2934        caller_instance: RuntimeComponentInstanceIndex,
2935        set: u32,
2936    ) -> Result<()> {
2937        self.id().get_mut(store).check_may_leave(caller_instance)?;
2938        let rep = store
2939            .handle_table(RuntimeInstance {
2940                instance: self.id().instance(),
2941                index: caller_instance,
2942            })
2943            .waitable_set_remove(set)?;
2944
2945        log::trace!("drop waitable set {rep} (handle {set})");
2946
2947        let set = store
2948            .concurrent_state_mut()
2949            .delete(TableId::<WaitableSet>::new(rep))?;
2950
2951        if !set.waiting.is_empty() {
2952            bail!("cannot drop waitable set with waiters");
2953        }
2954
2955        Ok(())
2956    }
2957
2958    /// Implements the `waitable.join` intrinsic.
2959    pub(crate) fn waitable_join(
2960        self,
2961        store: &mut StoreOpaque,
2962        caller_instance: RuntimeComponentInstanceIndex,
2963        waitable_handle: u32,
2964        set_handle: u32,
2965    ) -> Result<()> {
2966        let mut instance = self.id().get_mut(store);
2967        instance.check_may_leave(caller_instance)?;
2968        let waitable =
2969            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2970
2971        let set = if set_handle == 0 {
2972            None
2973        } else {
2974            let set = instance.instance_states().0[caller_instance]
2975                .handle_table()
2976                .waitable_set_rep(set_handle)?;
2977
2978            Some(TableId::<WaitableSet>::new(set))
2979        };
2980
2981        log::trace!(
2982            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2983        );
2984
2985        waitable.join(store.concurrent_state_mut(), set)
2986    }
2987
2988    /// Implements the `subtask.drop` intrinsic.
2989    pub(crate) fn subtask_drop(
2990        self,
2991        store: &mut StoreOpaque,
2992        caller_instance: RuntimeComponentInstanceIndex,
2993        task_id: u32,
2994    ) -> Result<()> {
2995        self.id().get_mut(store).check_may_leave(caller_instance)?;
2996        self.waitable_join(store, caller_instance, task_id, 0)?;
2997
2998        let (rep, is_host) = store
2999            .handle_table(RuntimeInstance {
3000                instance: self.id().instance(),
3001                index: caller_instance,
3002            })
3003            .subtask_remove(task_id)?;
3004
3005        let concurrent_state = store.concurrent_state_mut();
3006        let (waitable, expected_caller_instance, delete) = if is_host {
3007            let id = TableId::<HostTask>::new(rep);
3008            let task = concurrent_state.get_mut(id)?;
3009            if task.join_handle.is_some() {
3010                bail!("cannot drop a subtask which has not yet resolved");
3011            }
3012            (Waitable::Host(id), task.caller_instance, true)
3013        } else {
3014            let id = TableId::<GuestTask>::new(rep);
3015            let task = concurrent_state.get_mut(id)?;
3016            if task.lift_result.is_some() {
3017                bail!("cannot drop a subtask which has not yet resolved");
3018            }
3019            if let Caller::Guest { instance, .. } = &task.caller {
3020                (Waitable::Guest(id), *instance, task.exited)
3021            } else {
3022                unreachable!()
3023            }
3024        };
3025
3026        waitable.common(concurrent_state)?.handle = None;
3027
3028        if waitable.take_event(concurrent_state)?.is_some() {
3029            bail!("cannot drop a subtask with an undelivered event");
3030        }
3031
3032        if delete {
3033            waitable.delete_from(concurrent_state)?;
3034        }
3035
3036        // Since waitables can neither be passed between instances nor forged,
3037        // this should never fail unless there's a bug in Wasmtime, but we check
3038        // here to be sure:
3039        assert_eq!(expected_caller_instance, caller_instance);
3040        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3041        Ok(())
3042    }
3043
3044    /// Implements the `waitable-set.wait` intrinsic.
3045    pub(crate) fn waitable_set_wait(
3046        self,
3047        store: &mut StoreOpaque,
3048        caller: RuntimeComponentInstanceIndex,
3049        options: OptionsIndex,
3050        set: u32,
3051        payload: u32,
3052    ) -> Result<u32> {
3053        self.id().get(store).check_may_leave(caller)?;
3054
3055        if !self.options(store, options).async_ {
3056            // The caller may only call `waitable-set.wait` from an async task
3057            // (i.e. a task created via a call to an async export).
3058            // Otherwise, we'll trap.
3059            store.concurrent_state_mut().check_blocking()?;
3060        }
3061
3062        let &CanonicalOptions {
3063            cancellable,
3064            instance: caller_instance,
3065            ..
3066        } = &self.id().get(store).component().env_component().options[options];
3067        let rep = store
3068            .handle_table(RuntimeInstance {
3069                instance: self.id().instance(),
3070                index: caller_instance,
3071            })
3072            .waitable_set_rep(set)?;
3073
3074        self.waitable_check(
3075            store,
3076            cancellable,
3077            WaitableCheck::Wait(WaitableCheckParams {
3078                set: TableId::new(rep),
3079                options,
3080                payload,
3081            }),
3082        )
3083    }
3084
3085    /// Implements the `waitable-set.poll` intrinsic.
3086    pub(crate) fn waitable_set_poll(
3087        self,
3088        store: &mut StoreOpaque,
3089        caller: RuntimeComponentInstanceIndex,
3090        options: OptionsIndex,
3091        set: u32,
3092        payload: u32,
3093    ) -> Result<u32> {
3094        self.id().get(store).check_may_leave(caller)?;
3095
3096        if !self.options(store, options).async_ {
3097            // The caller may only call `waitable-set.poll` from an async task
3098            // (i.e. a task created via a call to an async export).
3099            // Otherwise, we'll trap.
3100            store.concurrent_state_mut().check_blocking()?;
3101        }
3102
3103        let &CanonicalOptions {
3104            cancellable,
3105            instance: caller_instance,
3106            ..
3107        } = &self.id().get(store).component().env_component().options[options];
3108        let rep = store
3109            .handle_table(RuntimeInstance {
3110                instance: self.id().instance(),
3111                index: caller_instance,
3112            })
3113            .waitable_set_rep(set)?;
3114
3115        self.waitable_check(
3116            store,
3117            cancellable,
3118            WaitableCheck::Poll(WaitableCheckParams {
3119                set: TableId::new(rep),
3120                options,
3121                payload,
3122            }),
3123        )
3124    }
3125
3126    /// Implements the `thread.index` intrinsic.
3127    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3128        let thread_id = store
3129            .concurrent_state_mut()
3130            .guest_thread
3131            .ok_or_else(|| anyhow!("no current thread"))?
3132            .thread;
3133        // The unwrap is safe because `instance_rep` must be `Some` by this point
3134        Ok(store
3135            .concurrent_state_mut()
3136            .get_mut(thread_id)?
3137            .instance_rep
3138            .unwrap())
3139    }
3140
3141    /// Implements the `thread.new-indirect` intrinsic.
3142    pub(crate) fn thread_new_indirect<T: 'static>(
3143        self,
3144        mut store: StoreContextMut<T>,
3145        runtime_instance: RuntimeComponentInstanceIndex,
3146        _func_ty_idx: TypeFuncIndex, // currently unused
3147        start_func_table_idx: RuntimeTableIndex,
3148        start_func_idx: u32,
3149        context: i32,
3150    ) -> Result<u32> {
3151        self.id().get(store.0).check_may_leave(runtime_instance)?;
3152
3153        log::trace!("creating new thread");
3154
3155        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3156        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3157        let callee = instance
3158            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3159            .ok_or_else(|| {
3160                anyhow!("the start function index points to an uninitialized function")
3161            })?;
3162        if callee.type_index(store.0) != start_func_ty.type_index() {
3163            bail!(
3164                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3165            );
3166        }
3167
3168        let token = StoreToken::new(store.as_context_mut());
3169        let start_func = Box::new(
3170            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3171                let old_thread = store
3172                    .concurrent_state_mut()
3173                    .guest_thread
3174                    .replace(guest_thread);
3175                log::trace!(
3176                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3177                );
3178
3179                store.maybe_push_call_context(guest_thread.task)?;
3180
3181                let mut store = token.as_context_mut(store);
3182                let mut params = [ValRaw::i32(context)];
3183                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3184                // on a separate fiber if we're running in an async store.
3185                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3186
3187                store.0.maybe_pop_call_context(guest_thread.task)?;
3188
3189                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3190                log::trace!("explicit thread {guest_thread:?} completed");
3191                let state = store.0.concurrent_state_mut();
3192                let task = state.get_mut(guest_thread.task)?;
3193                if task.threads.is_empty() && !task.returned_or_cancelled() {
3194                    bail!(Trap::NoAsyncResult);
3195                }
3196                state.guest_thread = old_thread;
3197                old_thread
3198                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3199                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3200                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3201                }
3202                log::trace!("thread start: restored {old_thread:?} as current thread");
3203
3204                Ok(())
3205            },
3206        );
3207
3208        let state = store.0.concurrent_state_mut();
3209        let current_thread = state.guest_thread.unwrap();
3210        let parent_task = current_thread.task;
3211
3212        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3213        let thread_id = state.push(new_thread)?;
3214        state.get_mut(parent_task)?.threads.insert(thread_id);
3215
3216        log::trace!("new thread with id {thread_id:?} created");
3217
3218        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3219    }
3220
3221    pub(crate) fn resume_suspended_thread(
3222        self,
3223        store: &mut StoreOpaque,
3224        runtime_instance: RuntimeComponentInstanceIndex,
3225        thread_idx: u32,
3226        high_priority: bool,
3227    ) -> Result<()> {
3228        let thread_id =
3229            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3230        let state = store.concurrent_state_mut();
3231        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3232        let thread = state.get_mut(guest_thread.thread)?;
3233
3234        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3235            GuestThreadState::NotStartedExplicit(start_func) => {
3236                log::trace!("starting thread {guest_thread:?}");
3237                let guest_call = WorkItem::GuestCall(GuestCall {
3238                    thread: guest_thread,
3239                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3240                        start_func(store, guest_thread)
3241                    })),
3242                });
3243                store
3244                    .concurrent_state_mut()
3245                    .push_work_item(guest_call, high_priority);
3246            }
3247            GuestThreadState::Suspended(fiber) => {
3248                log::trace!("resuming thread {thread_id:?} that was suspended");
3249                store
3250                    .concurrent_state_mut()
3251                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3252            }
3253            _ => {
3254                bail!("cannot resume thread which is not suspended");
3255            }
3256        }
3257        Ok(())
3258    }
3259
3260    fn add_guest_thread_to_instance_table(
3261        self,
3262        thread_id: TableId<GuestThread>,
3263        store: &mut StoreOpaque,
3264        runtime_instance: RuntimeComponentInstanceIndex,
3265    ) -> Result<u32> {
3266        let guest_id = store
3267            .handle_table(RuntimeInstance {
3268                instance: self.id().instance(),
3269                index: runtime_instance,
3270            })
3271            .guest_thread_insert(thread_id.rep())?;
3272        store
3273            .concurrent_state_mut()
3274            .get_mut(thread_id)?
3275            .instance_rep = Some(guest_id);
3276        Ok(guest_id)
3277    }
3278
3279    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3280    /// and `thread.switch-to` intrinsics.
3281    pub(crate) fn suspension_intrinsic(
3282        self,
3283        store: &mut StoreOpaque,
3284        caller: RuntimeComponentInstanceIndex,
3285        cancellable: bool,
3286        yielding: bool,
3287        to_thread: Option<u32>,
3288    ) -> Result<WaitResult> {
3289        self.id().get(store).check_may_leave(caller)?;
3290
3291        if to_thread.is_none() {
3292            let state = store.concurrent_state_mut();
3293            if yielding {
3294                // This is a `thread.yield` call
3295                if !state.may_block(state.guest_thread.unwrap().task) {
3296                    // The spec defines `thread.yield` to be a no-op in a
3297                    // non-blocking context, so we return immediately without giving
3298                    // any other thread a chance to run.
3299                    return Ok(WaitResult::Completed);
3300                }
3301            } else {
3302                // The caller may only call `thread.suspend` from an async task
3303                // (i.e. a task created via a call to an async export).
3304                // Otherwise, we'll trap.
3305                state.check_blocking()?;
3306            }
3307        }
3308
3309        // There could be a pending cancellation from a previous uncancellable wait
3310        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3311            return Ok(WaitResult::Cancelled);
3312        }
3313
3314        if let Some(thread) = to_thread {
3315            self.resume_suspended_thread(store, caller, thread, true)?;
3316        }
3317
3318        let state = store.concurrent_state_mut();
3319        let guest_thread = state.guest_thread.unwrap();
3320        let reason = if yielding {
3321            SuspendReason::Yielding {
3322                thread: guest_thread,
3323            }
3324        } else {
3325            SuspendReason::ExplicitlySuspending {
3326                thread: guest_thread,
3327                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3328                // we're handling a `thread.switch-to` call; otherwise it would
3329                // panic if we called it in a non-blocking context.
3330                skip_may_block_check: to_thread.is_some(),
3331            }
3332        };
3333
3334        store.suspend(reason)?;
3335
3336        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3337            Ok(WaitResult::Cancelled)
3338        } else {
3339            Ok(WaitResult::Completed)
3340        }
3341    }
3342
3343    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3344    fn waitable_check(
3345        self,
3346        store: &mut StoreOpaque,
3347        cancellable: bool,
3348        check: WaitableCheck,
3349    ) -> Result<u32> {
3350        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3351
3352        let (wait, set) = match &check {
3353            WaitableCheck::Wait(params) => (true, Some(params.set)),
3354            WaitableCheck::Poll(params) => (false, Some(params.set)),
3355        };
3356
3357        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3358        // First, suspend this fiber, allowing any other threads to run.
3359        store.suspend(SuspendReason::Yielding {
3360            thread: guest_thread,
3361        })?;
3362
3363        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3364
3365        let state = store.concurrent_state_mut();
3366        let task = state.get_mut(guest_thread.task)?;
3367
3368        // If we're waiting, and there are no events immediately available,
3369        // suspend the fiber until that changes.
3370        if wait {
3371            let set = set.unwrap();
3372
3373            if (task.event.is_none()
3374                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3375                && state.get_mut(set)?.ready.is_empty()
3376            {
3377                if cancellable {
3378                    let old = state
3379                        .get_mut(guest_thread.thread)?
3380                        .wake_on_cancel
3381                        .replace(set);
3382                    assert!(old.is_none());
3383                }
3384
3385                store.suspend(SuspendReason::Waiting {
3386                    set,
3387                    thread: guest_thread,
3388                    skip_may_block_check: false,
3389                })?;
3390            }
3391        }
3392
3393        log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3394
3395        let result = match check {
3396            // Deliver any pending events to the guest and return.
3397            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3398                let event =
3399                    self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3400
3401                let (ordinal, handle, result) = if wait {
3402                    let (event, waitable) = event.unwrap();
3403                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3404                    let (ordinal, result) = event.parts();
3405                    (ordinal, handle, result)
3406                } else {
3407                    if let Some((event, waitable)) = event {
3408                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3409                        let (ordinal, result) = event.parts();
3410                        (ordinal, handle, result)
3411                    } else {
3412                        log::trace!(
3413                            "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3414                            guest_thread.task,
3415                            params.set
3416                        );
3417                        let (ordinal, result) = Event::None.parts();
3418                        (ordinal, 0, result)
3419                    }
3420                };
3421                let memory = self.options_memory_mut(store, params.options);
3422                let ptr = func::validate_inbounds_dynamic(
3423                    &CanonicalAbiInfo::POINTER_PAIR,
3424                    memory,
3425                    &ValRaw::u32(params.payload),
3426                )?;
3427                memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3428                memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3429                Ok(ordinal)
3430            }
3431        };
3432
3433        result
3434    }
3435
3436    /// Implements the `subtask.cancel` intrinsic.
3437    pub(crate) fn subtask_cancel(
3438        self,
3439        store: &mut StoreOpaque,
3440        caller_instance: RuntimeComponentInstanceIndex,
3441        async_: bool,
3442        task_id: u32,
3443    ) -> Result<u32> {
3444        self.id().get(store).check_may_leave(caller_instance)?;
3445
3446        if !async_ {
3447            // The caller may only sync call `subtask.cancel` from an async task
3448            // (i.e. a task created via a call to an async export).  Otherwise,
3449            // we'll trap.
3450            store.concurrent_state_mut().check_blocking()?;
3451        }
3452
3453        let (rep, is_host) = store
3454            .handle_table(RuntimeInstance {
3455                instance: self.id().instance(),
3456                index: caller_instance,
3457            })
3458            .subtask_rep(task_id)?;
3459        let (waitable, expected_caller_instance) = if is_host {
3460            let id = TableId::<HostTask>::new(rep);
3461            (
3462                Waitable::Host(id),
3463                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3464            )
3465        } else {
3466            let id = TableId::<GuestTask>::new(rep);
3467            if let Caller::Guest { instance, .. } =
3468                &store.concurrent_state_mut().get_mut(id)?.caller
3469            {
3470                (Waitable::Guest(id), *instance)
3471            } else {
3472                unreachable!()
3473            }
3474        };
3475        // Since waitables can neither be passed between instances nor forged,
3476        // this should never fail unless there's a bug in Wasmtime, but we check
3477        // here to be sure:
3478        assert_eq!(expected_caller_instance, caller_instance);
3479
3480        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3481
3482        let concurrent_state = store.concurrent_state_mut();
3483        if let Waitable::Host(host_task) = waitable {
3484            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3485                handle.abort();
3486                return Ok(Status::ReturnCancelled as u32);
3487            }
3488        } else {
3489            let caller = concurrent_state.guest_thread.unwrap();
3490            let guest_task = TableId::<GuestTask>::new(rep);
3491            let task = concurrent_state.get_mut(guest_task)?;
3492            if !task.already_lowered_parameters() {
3493                task.lower_params = None;
3494                task.lift_result = None;
3495
3496                // Not yet started; cancel and remove from pending
3497                let instance = task.instance;
3498                let pending = &mut store.instance_state(instance).pending;
3499                let pending_count = pending.len();
3500                pending.retain(|thread, _| thread.task != guest_task);
3501                // If there were no pending threads for this task, we're in an error state
3502                if pending.len() == pending_count {
3503                    bail!("`subtask.cancel` called after terminal status delivered");
3504                }
3505                return Ok(Status::StartCancelled as u32);
3506            } else if !task.returned_or_cancelled() {
3507                // Started, but not yet returned or cancelled; send the
3508                // `CANCELLED` event
3509                task.cancel_sent = true;
3510                // Note that this might overwrite an event that was set earlier
3511                // (e.g. `Event::None` if the task is yielding, or
3512                // `Event::Cancelled` if it was already cancelled), but that's
3513                // okay -- this should supersede the previous state.
3514                task.event = Some(Event::Cancelled);
3515                for thread in task.threads.clone() {
3516                    let thread = QualifiedThreadId {
3517                        task: guest_task,
3518                        thread,
3519                    };
3520                    if let Some(set) = concurrent_state
3521                        .get_mut(thread.thread)
3522                        .unwrap()
3523                        .wake_on_cancel
3524                        .take()
3525                    {
3526                        let item = match concurrent_state
3527                            .get_mut(set)?
3528                            .waiting
3529                            .remove(&thread)
3530                            .unwrap()
3531                        {
3532                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3533                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3534                                thread,
3535                                kind: GuestCallKind::DeliverEvent {
3536                                    instance,
3537                                    set: None,
3538                                },
3539                            }),
3540                        };
3541                        concurrent_state.push_high_priority(item);
3542
3543                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3544                        break;
3545                    }
3546                }
3547
3548                let concurrent_state = store.concurrent_state_mut();
3549                let task = concurrent_state.get_mut(guest_task)?;
3550                if !task.returned_or_cancelled() {
3551                    if async_ {
3552                        return Ok(BLOCKED);
3553                    } else {
3554                        store.wait_for_event(Waitable::Guest(guest_task))?;
3555                    }
3556                }
3557            }
3558        }
3559
3560        let event = waitable.take_event(store.concurrent_state_mut())?;
3561        if let Some(Event::Subtask {
3562            status: status @ (Status::Returned | Status::ReturnCancelled),
3563        }) = event
3564        {
3565            Ok(status as u32)
3566        } else {
3567            bail!("`subtask.cancel` called after terminal status delivered");
3568        }
3569    }
3570
3571    pub(crate) fn context_get(
3572        self,
3573        store: &mut StoreOpaque,
3574        caller: RuntimeComponentInstanceIndex,
3575        slot: u32,
3576    ) -> Result<u32> {
3577        self.id().get(store).check_may_leave(caller)?;
3578        store.concurrent_state_mut().context_get(slot)
3579    }
3580
3581    pub(crate) fn context_set(
3582        self,
3583        store: &mut StoreOpaque,
3584        caller: RuntimeComponentInstanceIndex,
3585        slot: u32,
3586        value: u32,
3587    ) -> Result<()> {
3588        self.id().get(store).check_may_leave(caller)?;
3589        store.concurrent_state_mut().context_set(slot, value)
3590    }
3591}
3592
3593/// Trait representing component model ABI async intrinsics and fused adapter
3594/// helper functions.
3595///
3596/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3597/// which must be valid for at least the duration of the call (and possibly for
3598/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3599/// pointers used for async calls).
3600pub trait VMComponentAsyncStore {
3601    /// A helper function for fused adapter modules involving calls where the
3602    /// one of the caller or callee is async.
3603    ///
3604    /// This helper is not used when the caller and callee both use the sync
3605    /// ABI, only when at least one is async is this used.
3606    unsafe fn prepare_call(
3607        &mut self,
3608        instance: Instance,
3609        memory: *mut VMMemoryDefinition,
3610        start: *mut VMFuncRef,
3611        return_: *mut VMFuncRef,
3612        caller_instance: RuntimeComponentInstanceIndex,
3613        callee_instance: RuntimeComponentInstanceIndex,
3614        task_return_type: TypeTupleIndex,
3615        callee_async: bool,
3616        string_encoding: u8,
3617        result_count: u32,
3618        storage: *mut ValRaw,
3619        storage_len: usize,
3620    ) -> Result<()>;
3621
3622    /// A helper function for fused adapter modules involving calls where the
3623    /// caller is sync-lowered but the callee is async-lifted.
3624    unsafe fn sync_start(
3625        &mut self,
3626        instance: Instance,
3627        callback: *mut VMFuncRef,
3628        callee: *mut VMFuncRef,
3629        param_count: u32,
3630        storage: *mut MaybeUninit<ValRaw>,
3631        storage_len: usize,
3632    ) -> Result<()>;
3633
3634    /// A helper function for fused adapter modules involving calls where the
3635    /// caller is async-lowered.
3636    unsafe fn async_start(
3637        &mut self,
3638        instance: Instance,
3639        callback: *mut VMFuncRef,
3640        post_return: *mut VMFuncRef,
3641        callee: *mut VMFuncRef,
3642        param_count: u32,
3643        result_count: u32,
3644        flags: u32,
3645    ) -> Result<u32>;
3646
3647    /// The `future.write` intrinsic.
3648    fn future_write(
3649        &mut self,
3650        instance: Instance,
3651        caller: RuntimeComponentInstanceIndex,
3652        ty: TypeFutureTableIndex,
3653        options: OptionsIndex,
3654        future: u32,
3655        address: u32,
3656    ) -> Result<u32>;
3657
3658    /// The `future.read` intrinsic.
3659    fn future_read(
3660        &mut self,
3661        instance: Instance,
3662        caller: RuntimeComponentInstanceIndex,
3663        ty: TypeFutureTableIndex,
3664        options: OptionsIndex,
3665        future: u32,
3666        address: u32,
3667    ) -> Result<u32>;
3668
3669    /// The `future.drop-writable` intrinsic.
3670    fn future_drop_writable(
3671        &mut self,
3672        instance: Instance,
3673        caller: RuntimeComponentInstanceIndex,
3674        ty: TypeFutureTableIndex,
3675        writer: u32,
3676    ) -> Result<()>;
3677
3678    /// The `stream.write` intrinsic.
3679    fn stream_write(
3680        &mut self,
3681        instance: Instance,
3682        caller: RuntimeComponentInstanceIndex,
3683        ty: TypeStreamTableIndex,
3684        options: OptionsIndex,
3685        stream: u32,
3686        address: u32,
3687        count: u32,
3688    ) -> Result<u32>;
3689
3690    /// The `stream.read` intrinsic.
3691    fn stream_read(
3692        &mut self,
3693        instance: Instance,
3694        caller: RuntimeComponentInstanceIndex,
3695        ty: TypeStreamTableIndex,
3696        options: OptionsIndex,
3697        stream: u32,
3698        address: u32,
3699        count: u32,
3700    ) -> Result<u32>;
3701
3702    /// The "fast-path" implementation of the `stream.write` intrinsic for
3703    /// "flat" (i.e. memcpy-able) payloads.
3704    fn flat_stream_write(
3705        &mut self,
3706        instance: Instance,
3707        caller: RuntimeComponentInstanceIndex,
3708        ty: TypeStreamTableIndex,
3709        options: OptionsIndex,
3710        payload_size: u32,
3711        payload_align: u32,
3712        stream: u32,
3713        address: u32,
3714        count: u32,
3715    ) -> Result<u32>;
3716
3717    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3718    /// (i.e. memcpy-able) payloads.
3719    fn flat_stream_read(
3720        &mut self,
3721        instance: Instance,
3722        caller: RuntimeComponentInstanceIndex,
3723        ty: TypeStreamTableIndex,
3724        options: OptionsIndex,
3725        payload_size: u32,
3726        payload_align: u32,
3727        stream: u32,
3728        address: u32,
3729        count: u32,
3730    ) -> Result<u32>;
3731
3732    /// The `stream.drop-writable` intrinsic.
3733    fn stream_drop_writable(
3734        &mut self,
3735        instance: Instance,
3736        caller: RuntimeComponentInstanceIndex,
3737        ty: TypeStreamTableIndex,
3738        writer: u32,
3739    ) -> Result<()>;
3740
3741    /// The `error-context.debug-message` intrinsic.
3742    fn error_context_debug_message(
3743        &mut self,
3744        instance: Instance,
3745        caller: RuntimeComponentInstanceIndex,
3746        ty: TypeComponentLocalErrorContextTableIndex,
3747        options: OptionsIndex,
3748        err_ctx_handle: u32,
3749        debug_msg_address: u32,
3750    ) -> Result<()>;
3751
3752    /// The `thread.new-indirect` intrinsic
3753    fn thread_new_indirect(
3754        &mut self,
3755        instance: Instance,
3756        caller: RuntimeComponentInstanceIndex,
3757        func_ty_idx: TypeFuncIndex,
3758        start_func_table_idx: RuntimeTableIndex,
3759        start_func_idx: u32,
3760        context: i32,
3761    ) -> Result<u32>;
3762}
3763
3764/// SAFETY: See trait docs.
3765impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3766    unsafe fn prepare_call(
3767        &mut self,
3768        instance: Instance,
3769        memory: *mut VMMemoryDefinition,
3770        start: *mut VMFuncRef,
3771        return_: *mut VMFuncRef,
3772        caller_instance: RuntimeComponentInstanceIndex,
3773        callee_instance: RuntimeComponentInstanceIndex,
3774        task_return_type: TypeTupleIndex,
3775        callee_async: bool,
3776        string_encoding: u8,
3777        result_count_or_max_if_async: u32,
3778        storage: *mut ValRaw,
3779        storage_len: usize,
3780    ) -> Result<()> {
3781        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3782        // this method will have ensured that `storage` is a valid
3783        // pointer containing at least `storage_len` items.
3784        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3785
3786        unsafe {
3787            instance.prepare_call(
3788                StoreContextMut(self),
3789                start,
3790                return_,
3791                caller_instance,
3792                callee_instance,
3793                task_return_type,
3794                callee_async,
3795                memory,
3796                string_encoding,
3797                match result_count_or_max_if_async {
3798                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3799                        params,
3800                        has_result: false,
3801                    },
3802                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3803                        params,
3804                        has_result: true,
3805                    },
3806                    result_count => CallerInfo::Sync {
3807                        params,
3808                        result_count,
3809                    },
3810                },
3811            )
3812        }
3813    }
3814
3815    unsafe fn sync_start(
3816        &mut self,
3817        instance: Instance,
3818        callback: *mut VMFuncRef,
3819        callee: *mut VMFuncRef,
3820        param_count: u32,
3821        storage: *mut MaybeUninit<ValRaw>,
3822        storage_len: usize,
3823    ) -> Result<()> {
3824        unsafe {
3825            instance
3826                .start_call(
3827                    StoreContextMut(self),
3828                    callback,
3829                    ptr::null_mut(),
3830                    callee,
3831                    param_count,
3832                    1,
3833                    START_FLAG_ASYNC_CALLEE,
3834                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3835                    // this method will have ensured that `storage` is a valid
3836                    // pointer containing at least `storage_len` items.
3837                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3838                )
3839                .map(drop)
3840        }
3841    }
3842
3843    unsafe fn async_start(
3844        &mut self,
3845        instance: Instance,
3846        callback: *mut VMFuncRef,
3847        post_return: *mut VMFuncRef,
3848        callee: *mut VMFuncRef,
3849        param_count: u32,
3850        result_count: u32,
3851        flags: u32,
3852    ) -> Result<u32> {
3853        unsafe {
3854            instance.start_call(
3855                StoreContextMut(self),
3856                callback,
3857                post_return,
3858                callee,
3859                param_count,
3860                result_count,
3861                flags,
3862                None,
3863            )
3864        }
3865    }
3866
3867    fn future_write(
3868        &mut self,
3869        instance: Instance,
3870        caller: RuntimeComponentInstanceIndex,
3871        ty: TypeFutureTableIndex,
3872        options: OptionsIndex,
3873        future: u32,
3874        address: u32,
3875    ) -> Result<u32> {
3876        instance.id().get(self).check_may_leave(caller)?;
3877        instance
3878            .guest_write(
3879                StoreContextMut(self),
3880                caller,
3881                TransmitIndex::Future(ty),
3882                options,
3883                None,
3884                future,
3885                address,
3886                1,
3887            )
3888            .map(|result| result.encode())
3889    }
3890
3891    fn future_read(
3892        &mut self,
3893        instance: Instance,
3894        caller: RuntimeComponentInstanceIndex,
3895        ty: TypeFutureTableIndex,
3896        options: OptionsIndex,
3897        future: u32,
3898        address: u32,
3899    ) -> Result<u32> {
3900        instance.id().get(self).check_may_leave(caller)?;
3901        instance
3902            .guest_read(
3903                StoreContextMut(self),
3904                caller,
3905                TransmitIndex::Future(ty),
3906                options,
3907                None,
3908                future,
3909                address,
3910                1,
3911            )
3912            .map(|result| result.encode())
3913    }
3914
3915    fn stream_write(
3916        &mut self,
3917        instance: Instance,
3918        caller: RuntimeComponentInstanceIndex,
3919        ty: TypeStreamTableIndex,
3920        options: OptionsIndex,
3921        stream: u32,
3922        address: u32,
3923        count: u32,
3924    ) -> Result<u32> {
3925        instance.id().get(self).check_may_leave(caller)?;
3926        instance
3927            .guest_write(
3928                StoreContextMut(self),
3929                caller,
3930                TransmitIndex::Stream(ty),
3931                options,
3932                None,
3933                stream,
3934                address,
3935                count,
3936            )
3937            .map(|result| result.encode())
3938    }
3939
3940    fn stream_read(
3941        &mut self,
3942        instance: Instance,
3943        caller: RuntimeComponentInstanceIndex,
3944        ty: TypeStreamTableIndex,
3945        options: OptionsIndex,
3946        stream: u32,
3947        address: u32,
3948        count: u32,
3949    ) -> Result<u32> {
3950        instance.id().get(self).check_may_leave(caller)?;
3951        instance
3952            .guest_read(
3953                StoreContextMut(self),
3954                caller,
3955                TransmitIndex::Stream(ty),
3956                options,
3957                None,
3958                stream,
3959                address,
3960                count,
3961            )
3962            .map(|result| result.encode())
3963    }
3964
3965    fn future_drop_writable(
3966        &mut self,
3967        instance: Instance,
3968        caller: RuntimeComponentInstanceIndex,
3969        ty: TypeFutureTableIndex,
3970        writer: u32,
3971    ) -> Result<()> {
3972        instance.id().get(self).check_may_leave(caller)?;
3973        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3974    }
3975
3976    fn flat_stream_write(
3977        &mut self,
3978        instance: Instance,
3979        caller: RuntimeComponentInstanceIndex,
3980        ty: TypeStreamTableIndex,
3981        options: OptionsIndex,
3982        payload_size: u32,
3983        payload_align: u32,
3984        stream: u32,
3985        address: u32,
3986        count: u32,
3987    ) -> Result<u32> {
3988        instance.id().get(self).check_may_leave(caller)?;
3989        instance
3990            .guest_write(
3991                StoreContextMut(self),
3992                caller,
3993                TransmitIndex::Stream(ty),
3994                options,
3995                Some(FlatAbi {
3996                    size: payload_size,
3997                    align: payload_align,
3998                }),
3999                stream,
4000                address,
4001                count,
4002            )
4003            .map(|result| result.encode())
4004    }
4005
4006    fn flat_stream_read(
4007        &mut self,
4008        instance: Instance,
4009        caller: RuntimeComponentInstanceIndex,
4010        ty: TypeStreamTableIndex,
4011        options: OptionsIndex,
4012        payload_size: u32,
4013        payload_align: u32,
4014        stream: u32,
4015        address: u32,
4016        count: u32,
4017    ) -> Result<u32> {
4018        instance.id().get(self).check_may_leave(caller)?;
4019        instance
4020            .guest_read(
4021                StoreContextMut(self),
4022                caller,
4023                TransmitIndex::Stream(ty),
4024                options,
4025                Some(FlatAbi {
4026                    size: payload_size,
4027                    align: payload_align,
4028                }),
4029                stream,
4030                address,
4031                count,
4032            )
4033            .map(|result| result.encode())
4034    }
4035
4036    fn stream_drop_writable(
4037        &mut self,
4038        instance: Instance,
4039        caller: RuntimeComponentInstanceIndex,
4040        ty: TypeStreamTableIndex,
4041        writer: u32,
4042    ) -> Result<()> {
4043        instance.id().get(self).check_may_leave(caller)?;
4044        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4045    }
4046
4047    fn error_context_debug_message(
4048        &mut self,
4049        instance: Instance,
4050        caller: RuntimeComponentInstanceIndex,
4051        ty: TypeComponentLocalErrorContextTableIndex,
4052        options: OptionsIndex,
4053        err_ctx_handle: u32,
4054        debug_msg_address: u32,
4055    ) -> Result<()> {
4056        instance.id().get(self).check_may_leave(caller)?;
4057        instance.error_context_debug_message(
4058            StoreContextMut(self),
4059            ty,
4060            options,
4061            err_ctx_handle,
4062            debug_msg_address,
4063        )
4064    }
4065
4066    fn thread_new_indirect(
4067        &mut self,
4068        instance: Instance,
4069        caller: RuntimeComponentInstanceIndex,
4070        func_ty_idx: TypeFuncIndex,
4071        start_func_table_idx: RuntimeTableIndex,
4072        start_func_idx: u32,
4073        context: i32,
4074    ) -> Result<u32> {
4075        instance.thread_new_indirect(
4076            StoreContextMut(self),
4077            caller,
4078            func_ty_idx,
4079            start_func_table_idx,
4080            start_func_idx,
4081            context,
4082        )
4083    }
4084}
4085
4086type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4087
4088/// Represents the state of a pending host task.
4089struct HostTask {
4090    common: WaitableCommon,
4091    caller_instance: RuntimeComponentInstanceIndex,
4092    join_handle: Option<JoinHandle>,
4093}
4094
4095impl HostTask {
4096    fn new(
4097        caller_instance: RuntimeComponentInstanceIndex,
4098        join_handle: Option<JoinHandle>,
4099    ) -> Self {
4100        Self {
4101            common: WaitableCommon::default(),
4102            caller_instance,
4103            join_handle,
4104        }
4105    }
4106}
4107
4108impl TableDebug for HostTask {
4109    fn type_name() -> &'static str {
4110        "HostTask"
4111    }
4112}
4113
4114type CallbackFn = Box<
4115    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4116        + Send
4117        + Sync
4118        + 'static,
4119>;
4120
4121/// Represents the caller of a given guest task.
4122enum Caller {
4123    /// The host called the guest task.
4124    Host {
4125        /// If present, may be used to deliver the result.
4126        tx: Option<oneshot::Sender<LiftedResult>>,
4127        /// Channel to notify once all subtasks spawned by this caller have
4128        /// completed.
4129        ///
4130        /// Note that we'll never actually send anything to this channel;
4131        /// dropping it when the refcount goes to zero is sufficient to notify
4132        /// the receiver.
4133        exit_tx: Arc<oneshot::Sender<()>>,
4134        /// If true, there's a host future that must be dropped before the task
4135        /// can be deleted.
4136        host_future_present: bool,
4137        /// If true, call `post-return` function (if any) automatically.
4138        call_post_return_automatically: bool,
4139    },
4140    /// Another guest thread called the guest task
4141    Guest {
4142        /// The id of the caller
4143        thread: QualifiedThreadId,
4144        /// The instance to use to enforce reentrance rules.
4145        ///
4146        /// Note that this might not be the same as the instance the caller task
4147        /// started executing in given that one or more synchronous guest->guest
4148        /// calls may have occurred involving multiple instances.
4149        instance: RuntimeComponentInstanceIndex,
4150    },
4151}
4152
4153/// Represents a closure and related canonical ABI parameters required to
4154/// validate a `task.return` call at runtime and lift the result.
4155struct LiftResult {
4156    lift: RawLift,
4157    ty: TypeTupleIndex,
4158    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4159    string_encoding: StringEncoding,
4160}
4161
4162/// The table ID for a guest thread, qualified by the task to which it belongs.
4163///
4164/// This exists to minimize table lookups and the necessity to pass stores around mutably
4165/// for the common case of identifying the task to which a thread belongs.
4166#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4167struct QualifiedThreadId {
4168    task: TableId<GuestTask>,
4169    thread: TableId<GuestThread>,
4170}
4171
4172impl QualifiedThreadId {
4173    fn qualify(
4174        state: &mut ConcurrentState,
4175        thread: TableId<GuestThread>,
4176    ) -> Result<QualifiedThreadId> {
4177        Ok(QualifiedThreadId {
4178            task: state.get_mut(thread)?.parent_task,
4179            thread,
4180        })
4181    }
4182}
4183
4184impl fmt::Debug for QualifiedThreadId {
4185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4186        f.debug_tuple("QualifiedThreadId")
4187            .field(&self.task.rep())
4188            .field(&self.thread.rep())
4189            .finish()
4190    }
4191}
4192
4193enum GuestThreadState {
4194    NotStartedImplicit,
4195    NotStartedExplicit(
4196        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4197    ),
4198    Running,
4199    Suspended(StoreFiber<'static>),
4200    Pending,
4201    Completed,
4202}
4203pub struct GuestThread {
4204    /// Context-local state used to implement the `context.{get,set}`
4205    /// intrinsics.
4206    context: [u32; 2],
4207    /// The owning guest task.
4208    parent_task: TableId<GuestTask>,
4209    /// If present, indicates that the thread is currently waiting on the
4210    /// specified set but may be cancelled and woken immediately.
4211    wake_on_cancel: Option<TableId<WaitableSet>>,
4212    /// The execution state of this guest thread
4213    state: GuestThreadState,
4214    /// The index of this thread in the component instance's handle table.
4215    /// This must always be `Some` after initialization.
4216    instance_rep: Option<u32>,
4217}
4218
4219impl GuestThread {
4220    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4221    /// handle.
4222    fn from_instance(
4223        state: Pin<&mut ComponentInstance>,
4224        caller_instance: RuntimeComponentInstanceIndex,
4225        guest_thread: u32,
4226    ) -> Result<TableId<Self>> {
4227        let rep = state.instance_states().0[caller_instance]
4228            .handle_table()
4229            .guest_thread_rep(guest_thread)?;
4230        Ok(TableId::new(rep))
4231    }
4232
4233    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4234        Self {
4235            context: [0; 2],
4236            parent_task,
4237            wake_on_cancel: None,
4238            state: GuestThreadState::NotStartedImplicit,
4239            instance_rep: None,
4240        }
4241    }
4242
4243    fn new_explicit(
4244        parent_task: TableId<GuestTask>,
4245        start_func: Box<
4246            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4247        >,
4248    ) -> Self {
4249        Self {
4250            context: [0; 2],
4251            parent_task,
4252            wake_on_cancel: None,
4253            state: GuestThreadState::NotStartedExplicit(start_func),
4254            instance_rep: None,
4255        }
4256    }
4257}
4258
4259impl TableDebug for GuestThread {
4260    fn type_name() -> &'static str {
4261        "GuestThread"
4262    }
4263}
4264
4265enum SyncResult {
4266    NotProduced,
4267    Produced(Option<ValRaw>),
4268    Taken,
4269}
4270
4271impl SyncResult {
4272    fn take(&mut self) -> Option<Option<ValRaw>> {
4273        match mem::replace(self, SyncResult::Taken) {
4274            SyncResult::NotProduced => None,
4275            SyncResult::Produced(val) => Some(val),
4276            SyncResult::Taken => {
4277                panic!("attempted to take a synchronous result that was already taken")
4278            }
4279        }
4280    }
4281}
4282
4283#[derive(Debug)]
4284enum HostFutureState {
4285    NotApplicable,
4286    Live,
4287    Dropped,
4288}
4289
4290/// Represents a pending guest task.
4291pub(crate) struct GuestTask {
4292    /// See `WaitableCommon`
4293    common: WaitableCommon,
4294    /// Closure to lower the parameters passed to this task.
4295    lower_params: Option<RawLower>,
4296    /// See `LiftResult`
4297    lift_result: Option<LiftResult>,
4298    /// A place to stash the type-erased lifted result if it can't be delivered
4299    /// immediately.
4300    result: Option<LiftedResult>,
4301    /// Closure to call the callback function for an async-lifted export, if
4302    /// provided.
4303    callback: Option<CallbackFn>,
4304    /// See `Caller`
4305    caller: Caller,
4306    /// A place to stash the call context for managing resource borrows while
4307    /// switching between guest tasks.
4308    call_context: Option<CallContext>,
4309    /// A place to stash the lowered result for a sync-to-async call until it
4310    /// can be returned to the caller.
4311    sync_result: SyncResult,
4312    /// Whether or not the task has been cancelled (i.e. whether the task is
4313    /// permitted to call `task.cancel`).
4314    cancel_sent: bool,
4315    /// Whether or not we've sent a `Status::Starting` event to any current or
4316    /// future waiters for this waitable.
4317    starting_sent: bool,
4318    /// Pending guest subtasks created by this task (directly or indirectly).
4319    ///
4320    /// This is used to re-parent subtasks which are still running when their
4321    /// parent task is disposed.
4322    subtasks: HashSet<TableId<GuestTask>>,
4323    /// Scratch waitable set used to watch subtasks during synchronous calls.
4324    sync_call_set: TableId<WaitableSet>,
4325    /// The runtime instance to which the exported function for this guest task
4326    /// belongs.
4327    ///
4328    /// Note that the task may do a sync->sync call via a fused adapter which
4329    /// results in that task executing code in a different instance, and it may
4330    /// call host functions and intrinsics from that other instance.
4331    instance: RuntimeInstance,
4332    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4333    /// delivered to this task.
4334    event: Option<Event>,
4335    /// The `ExportIndex` of the guest function being called, if known.
4336    function_index: Option<ExportIndex>,
4337    /// Whether or not the task has exited.
4338    exited: bool,
4339    /// Threads belonging to this task
4340    threads: HashSet<TableId<GuestThread>>,
4341    /// The state of the host future that represents an async task, which must
4342    /// be dropped before we can delete the task.
4343    host_future_state: HostFutureState,
4344    /// Indicates whether this task was created for a call to an async-lifted
4345    /// export.
4346    async_function: bool,
4347}
4348
4349impl GuestTask {
4350    fn already_lowered_parameters(&self) -> bool {
4351        // We reset `lower_params` after we lower the parameters
4352        self.lower_params.is_none()
4353    }
4354
4355    fn returned_or_cancelled(&self) -> bool {
4356        // We reset `lift_result` after we return or exit
4357        self.lift_result.is_none()
4358    }
4359
4360    fn ready_to_delete(&self) -> bool {
4361        let threads_completed = self.threads.is_empty();
4362        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4363        let pending_completion_event = matches!(
4364            self.common.event,
4365            Some(Event::Subtask {
4366                status: Status::Returned | Status::ReturnCancelled
4367            })
4368        );
4369        let ready = threads_completed
4370            && !has_sync_result
4371            && !pending_completion_event
4372            && !matches!(self.host_future_state, HostFutureState::Live);
4373        log::trace!(
4374            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4375            threads_completed,
4376            has_sync_result,
4377            pending_completion_event,
4378            self.host_future_state
4379        );
4380        ready
4381    }
4382
4383    fn new(
4384        state: &mut ConcurrentState,
4385        lower_params: RawLower,
4386        lift_result: LiftResult,
4387        caller: Caller,
4388        callback: Option<CallbackFn>,
4389        component_instance: Instance,
4390        instance: RuntimeComponentInstanceIndex,
4391        async_function: bool,
4392    ) -> Result<Self> {
4393        let sync_call_set = state.push(WaitableSet::default())?;
4394        let host_future_state = match &caller {
4395            Caller::Guest { .. } => HostFutureState::NotApplicable,
4396            Caller::Host {
4397                host_future_present,
4398                ..
4399            } => {
4400                if *host_future_present {
4401                    HostFutureState::Live
4402                } else {
4403                    HostFutureState::NotApplicable
4404                }
4405            }
4406        };
4407        Ok(Self {
4408            common: WaitableCommon::default(),
4409            lower_params: Some(lower_params),
4410            lift_result: Some(lift_result),
4411            result: None,
4412            callback,
4413            caller,
4414            call_context: Some(CallContext::default()),
4415            sync_result: SyncResult::NotProduced,
4416            cancel_sent: false,
4417            starting_sent: false,
4418            subtasks: HashSet::new(),
4419            sync_call_set,
4420            instance: RuntimeInstance {
4421                instance: component_instance.id().instance(),
4422                index: instance,
4423            },
4424            event: None,
4425            function_index: None,
4426            exited: false,
4427            threads: HashSet::new(),
4428            host_future_state,
4429            async_function,
4430        })
4431    }
4432
4433    /// Dispose of this guest task, reparenting any pending subtasks to the
4434    /// caller.
4435    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4436        // If there are not-yet-delivered completion events for subtasks in
4437        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4438        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4439            if let Some(Event::Subtask {
4440                status: Status::Returned | Status::ReturnCancelled,
4441            }) = waitable.common(state)?.event
4442            {
4443                waitable.delete_from(state)?;
4444            }
4445        }
4446
4447        assert!(self.threads.is_empty());
4448
4449        state.delete(self.sync_call_set)?;
4450
4451        // Reparent any pending subtasks to the caller.
4452        match &self.caller {
4453            Caller::Guest {
4454                thread,
4455                instance: runtime_instance,
4456            } => {
4457                let task_mut = state.get_mut(thread.task)?;
4458                let present = task_mut.subtasks.remove(&me);
4459                assert!(present);
4460
4461                for subtask in &self.subtasks {
4462                    task_mut.subtasks.insert(*subtask);
4463                }
4464
4465                for subtask in &self.subtasks {
4466                    state.get_mut(*subtask)?.caller = Caller::Guest {
4467                        thread: *thread,
4468                        instance: *runtime_instance,
4469                    };
4470                }
4471            }
4472            Caller::Host { exit_tx, .. } => {
4473                for subtask in &self.subtasks {
4474                    state.get_mut(*subtask)?.caller = Caller::Host {
4475                        tx: None,
4476                        // Clone `exit_tx` to ensure that it is only dropped
4477                        // once all transitive subtasks of the host call have
4478                        // exited:
4479                        exit_tx: exit_tx.clone(),
4480                        host_future_present: false,
4481                        call_post_return_automatically: true,
4482                    };
4483                }
4484            }
4485        }
4486
4487        for subtask in self.subtasks {
4488            if state.get_mut(subtask)?.exited {
4489                Waitable::Guest(subtask).delete_from(state)?;
4490            }
4491        }
4492
4493        Ok(())
4494    }
4495
4496    fn call_post_return_automatically(&self) -> bool {
4497        matches!(
4498            self.caller,
4499            Caller::Guest { .. }
4500                | Caller::Host {
4501                    call_post_return_automatically: true,
4502                    ..
4503                }
4504        )
4505    }
4506}
4507
4508impl TableDebug for GuestTask {
4509    fn type_name() -> &'static str {
4510        "GuestTask"
4511    }
4512}
4513
4514/// Represents state common to all kinds of waitables.
4515#[derive(Default)]
4516struct WaitableCommon {
4517    /// The currently pending event for this waitable, if any.
4518    event: Option<Event>,
4519    /// The set to which this waitable belongs, if any.
4520    set: Option<TableId<WaitableSet>>,
4521    /// The handle with which the guest refers to this waitable, if any.
4522    handle: Option<u32>,
4523}
4524
4525/// Represents a Component Model Async `waitable`.
4526#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4527enum Waitable {
4528    /// A host task
4529    Host(TableId<HostTask>),
4530    /// A guest task
4531    Guest(TableId<GuestTask>),
4532    /// The read or write end of a stream or future
4533    Transmit(TableId<TransmitHandle>),
4534}
4535
4536impl Waitable {
4537    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4538    /// handle.
4539    fn from_instance(
4540        state: Pin<&mut ComponentInstance>,
4541        caller_instance: RuntimeComponentInstanceIndex,
4542        waitable: u32,
4543    ) -> Result<Self> {
4544        use crate::runtime::vm::component::Waitable;
4545
4546        let (waitable, kind) = state.instance_states().0[caller_instance]
4547            .handle_table()
4548            .waitable_rep(waitable)?;
4549
4550        Ok(match kind {
4551            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4552            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4553            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4554        })
4555    }
4556
4557    /// Retrieve the host-visible identifier for this `Waitable`.
4558    fn rep(&self) -> u32 {
4559        match self {
4560            Self::Host(id) => id.rep(),
4561            Self::Guest(id) => id.rep(),
4562            Self::Transmit(id) => id.rep(),
4563        }
4564    }
4565
4566    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4567    /// remove it from any set it may currently belong to (when `set` is
4568    /// `None`).
4569    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4570        log::trace!("waitable {self:?} join set {set:?}",);
4571
4572        let old = mem::replace(&mut self.common(state)?.set, set);
4573
4574        if let Some(old) = old {
4575            match *self {
4576                Waitable::Host(id) => state.remove_child(id, old),
4577                Waitable::Guest(id) => state.remove_child(id, old),
4578                Waitable::Transmit(id) => state.remove_child(id, old),
4579            }?;
4580
4581            state.get_mut(old)?.ready.remove(self);
4582        }
4583
4584        if let Some(set) = set {
4585            match *self {
4586                Waitable::Host(id) => state.add_child(id, set),
4587                Waitable::Guest(id) => state.add_child(id, set),
4588                Waitable::Transmit(id) => state.add_child(id, set),
4589            }?;
4590
4591            if self.common(state)?.event.is_some() {
4592                self.mark_ready(state)?;
4593            }
4594        }
4595
4596        Ok(())
4597    }
4598
4599    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4600    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4601        Ok(match self {
4602            Self::Host(id) => &mut state.get_mut(*id)?.common,
4603            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4604            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4605        })
4606    }
4607
4608    /// Set or clear the pending event for this waitable and either deliver it
4609    /// to the first waiter, if any, or mark it as ready to be delivered to the
4610    /// next waiter that arrives.
4611    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4612        log::trace!("set event for {self:?}: {event:?}");
4613        self.common(state)?.event = event;
4614        self.mark_ready(state)
4615    }
4616
4617    /// Take the pending event from this waitable, leaving `None` in its place.
4618    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4619        let common = self.common(state)?;
4620        let event = common.event.take();
4621        if let Some(set) = self.common(state)?.set {
4622            state.get_mut(set)?.ready.remove(self);
4623        }
4624
4625        Ok(event)
4626    }
4627
4628    /// Deliver the current event for this waitable to the first waiter, if any,
4629    /// or else mark it as ready to be delivered to the next waiter that
4630    /// arrives.
4631    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4632        if let Some(set) = self.common(state)?.set {
4633            state.get_mut(set)?.ready.insert(*self);
4634            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4635                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4636                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4637
4638                let item = match mode {
4639                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4640                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4641                        thread,
4642                        kind: GuestCallKind::DeliverEvent {
4643                            instance,
4644                            set: Some(set),
4645                        },
4646                    }),
4647                };
4648                state.push_high_priority(item);
4649            }
4650        }
4651        Ok(())
4652    }
4653
4654    /// Remove this waitable from the instance's rep table.
4655    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4656        match self {
4657            Self::Host(task) => {
4658                log::trace!("delete host task {task:?}");
4659                state.delete(*task)?;
4660            }
4661            Self::Guest(task) => {
4662                log::trace!("delete guest task {task:?}");
4663                state.delete(*task)?.dispose(state, *task)?;
4664            }
4665            Self::Transmit(task) => {
4666                state.delete(*task)?;
4667            }
4668        }
4669
4670        Ok(())
4671    }
4672}
4673
4674impl fmt::Debug for Waitable {
4675    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4676        match self {
4677            Self::Host(id) => write!(f, "{id:?}"),
4678            Self::Guest(id) => write!(f, "{id:?}"),
4679            Self::Transmit(id) => write!(f, "{id:?}"),
4680        }
4681    }
4682}
4683
4684/// Represents a Component Model Async `waitable-set`.
4685#[derive(Default)]
4686struct WaitableSet {
4687    /// Which waitables in this set have pending events, if any.
4688    ready: BTreeSet<Waitable>,
4689    /// Which guest threads are currently waiting on this set, if any.
4690    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4691}
4692
4693impl TableDebug for WaitableSet {
4694    fn type_name() -> &'static str {
4695        "WaitableSet"
4696    }
4697}
4698
4699/// Type-erased closure to lower the parameters for a guest task.
4700type RawLower =
4701    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4702
4703/// Type-erased closure to lift the result for a guest task.
4704type RawLift = Box<
4705    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4706>;
4707
4708/// Type erased result of a guest task which may be downcast to the expected
4709/// type by a host caller (or simply ignored in the case of a guest caller; see
4710/// `DummyResult`).
4711type LiftedResult = Box<dyn Any + Send + Sync>;
4712
4713/// Used to return a result from a `LiftFn` when the actual result has already
4714/// been lowered to a guest task's stack and linear memory.
4715struct DummyResult;
4716
4717/// Represents the Component Model Async state of a (sub-)component instance.
4718#[derive(Default)]
4719pub struct ConcurrentInstanceState {
4720    /// Whether backpressure is set for this instance (enabled if >0)
4721    backpressure: u16,
4722    /// Whether this instance can be entered
4723    do_not_enter: bool,
4724    /// Pending calls for this instance which require `Self::backpressure` to be
4725    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4726    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4727}
4728
4729impl ConcurrentInstanceState {
4730    pub fn pending_is_empty(&self) -> bool {
4731        self.pending.is_empty()
4732    }
4733}
4734
4735/// Represents the Component Model Async state of a store.
4736pub struct ConcurrentState {
4737    /// The currently running guest thread, if any.
4738    guest_thread: Option<QualifiedThreadId>,
4739
4740    /// The set of pending host and background tasks, if any.
4741    ///
4742    /// See `ComponentInstance::poll_until` for where we temporarily take this
4743    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4744    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4745    /// The table of waitables, waitable sets, etc.
4746    table: AlwaysMut<ResourceTable>,
4747    /// The "high priority" work queue for this store's event loop.
4748    high_priority: Vec<WorkItem>,
4749    /// The "low priority" work queue for this store's event loop.
4750    low_priority: Vec<WorkItem>,
4751    /// A place to stash the reason a fiber is suspending so that the code which
4752    /// resumed it will know under what conditions the fiber should be resumed
4753    /// again.
4754    suspend_reason: Option<SuspendReason>,
4755    /// A cached fiber which is waiting for work to do.
4756    ///
4757    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4758    worker: Option<StoreFiber<'static>>,
4759    /// A place to stash the work item for which we're resuming a worker fiber.
4760    worker_item: Option<WorkerItem>,
4761
4762    /// Reference counts for all component error contexts
4763    ///
4764    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4765    /// (sub)component ref counts as tracked by `error_context_tables`, for
4766    /// example when the host holds one or more references to error contexts.
4767    ///
4768    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4769    /// component-wide representation) of the index into concurrent state for a given
4770    /// stored `ErrorContext`.
4771    ///
4772    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4773    /// as a `TableId<ErrorContextState>`.
4774    global_error_context_ref_counts:
4775        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4776}
4777
4778impl Default for ConcurrentState {
4779    fn default() -> Self {
4780        Self {
4781            guest_thread: None,
4782            table: AlwaysMut::new(ResourceTable::new()),
4783            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4784            high_priority: Vec::new(),
4785            low_priority: Vec::new(),
4786            suspend_reason: None,
4787            worker: None,
4788            worker_item: None,
4789            global_error_context_ref_counts: BTreeMap::new(),
4790        }
4791    }
4792}
4793
4794impl ConcurrentState {
4795    /// Take ownership of any fibers and futures owned by this object.
4796    ///
4797    /// This should be used when disposing of the `Store` containing this object
4798    /// in order to gracefully resolve any and all fibers using
4799    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4800    /// use-after-free bugs due to fibers which may still have access to the
4801    /// `Store`.
4802    ///
4803    /// Additionally, the futures collected with this function should be dropped
4804    /// within a `tls::set` call, which will ensure than any futures closing
4805    /// over an `&Accessor` will have access to the store when dropped, allowing
4806    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4807    /// panicking.
4808    ///
4809    /// Note that this will leave the object in an inconsistent and unusable
4810    /// state, so it should only be used just prior to dropping it.
4811    pub(crate) fn take_fibers_and_futures(
4812        &mut self,
4813        fibers: &mut Vec<StoreFiber<'static>>,
4814        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4815    ) {
4816        for entry in self.table.get_mut().iter_mut() {
4817            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4818                for mode in mem::take(&mut set.waiting).into_values() {
4819                    if let WaitMode::Fiber(fiber) = mode {
4820                        fibers.push(fiber);
4821                    }
4822                }
4823            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4824                if let GuestThreadState::Suspended(fiber) =
4825                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4826                {
4827                    fibers.push(fiber);
4828                }
4829            }
4830        }
4831
4832        if let Some(fiber) = self.worker.take() {
4833            fibers.push(fiber);
4834        }
4835
4836        let mut take_items = |list| {
4837            for item in mem::take(list) {
4838                match item {
4839                    WorkItem::ResumeFiber(fiber) => {
4840                        fibers.push(fiber);
4841                    }
4842                    WorkItem::PushFuture(future) => {
4843                        self.futures
4844                            .get_mut()
4845                            .as_mut()
4846                            .unwrap()
4847                            .push(future.into_inner());
4848                    }
4849                    _ => {}
4850                }
4851            }
4852        };
4853
4854        take_items(&mut self.high_priority);
4855        take_items(&mut self.low_priority);
4856
4857        if let Some(them) = self.futures.get_mut().take() {
4858            futures.push(them);
4859        }
4860    }
4861
4862    fn push<V: Send + Sync + 'static>(
4863        &mut self,
4864        value: V,
4865    ) -> Result<TableId<V>, ResourceTableError> {
4866        self.table.get_mut().push(value).map(TableId::from)
4867    }
4868
4869    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4870        self.table.get_mut().get_mut(&Resource::from(id))
4871    }
4872
4873    pub fn add_child<T: 'static, U: 'static>(
4874        &mut self,
4875        child: TableId<T>,
4876        parent: TableId<U>,
4877    ) -> Result<(), ResourceTableError> {
4878        self.table
4879            .get_mut()
4880            .add_child(Resource::from(child), Resource::from(parent))
4881    }
4882
4883    pub fn remove_child<T: 'static, U: 'static>(
4884        &mut self,
4885        child: TableId<T>,
4886        parent: TableId<U>,
4887    ) -> Result<(), ResourceTableError> {
4888        self.table
4889            .get_mut()
4890            .remove_child(Resource::from(child), Resource::from(parent))
4891    }
4892
4893    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4894        self.table.get_mut().delete(Resource::from(id))
4895    }
4896
4897    fn push_future(&mut self, future: HostTaskFuture) {
4898        // Note that we can't directly push to `ConcurrentState::futures` here
4899        // since this may be called from a future that's being polled inside
4900        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4901        // so it has exclusive access while polling it.  Therefore, we push a
4902        // work item to the "high priority" queue, which will actually push to
4903        // `ConcurrentState::futures` later.
4904        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4905    }
4906
4907    fn push_high_priority(&mut self, item: WorkItem) {
4908        log::trace!("push high priority: {item:?}");
4909        self.high_priority.push(item);
4910    }
4911
4912    fn push_low_priority(&mut self, item: WorkItem) {
4913        log::trace!("push low priority: {item:?}");
4914        self.low_priority.push(item);
4915    }
4916
4917    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4918        if high_priority {
4919            self.push_high_priority(item);
4920        } else {
4921            self.push_low_priority(item);
4922        }
4923    }
4924
4925    /// Determine whether the instance associated with the specified guest task
4926    /// may be entered (i.e. is not already on the async call stack).
4927    ///
4928    /// This is an additional check on top of the "may_enter" instance flag;
4929    /// it's needed because async-lifted exports with callback functions must
4930    /// not call their own instances directly or indirectly, and due to the
4931    /// "stackless" nature of callback-enabled guest tasks this may happen even
4932    /// if there are no activation records on the stack (i.e. the "may_enter"
4933    /// field is `true`) for that instance.
4934    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4935        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4936
4937        // Walk the task tree back to the root, looking for potential
4938        // reentrance.
4939        //
4940        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4941        // such that each bit represents and instance which has been entered by
4942        // that task or an ancestor of that task, in which case this would be a
4943        // constant time check.
4944        loop {
4945            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4946                Caller::Host { .. } => break true,
4947                Caller::Guest { thread, instance } => {
4948                    if *instance == guest_instance.index {
4949                        break false;
4950                    } else {
4951                        *thread
4952                    }
4953                }
4954            };
4955            guest_task = next_thread.task;
4956        }
4957    }
4958
4959    /// Implements the `context.get` intrinsic.
4960    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4961        let thread = self.guest_thread.unwrap();
4962        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4963        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4964        Ok(val)
4965    }
4966
4967    /// Implements the `context.set` intrinsic.
4968    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4969        let thread = self.guest_thread.unwrap();
4970        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4971        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4972        Ok(())
4973    }
4974
4975    /// Returns whether there's a pending cancellation on the current guest thread,
4976    /// consuming the event if so.
4977    fn take_pending_cancellation(&mut self) -> bool {
4978        let thread = self.guest_thread.unwrap();
4979        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4980            assert!(matches!(event, Event::Cancelled));
4981            true
4982        } else {
4983            false
4984        }
4985    }
4986
4987    fn check_blocking(&mut self) -> Result<()> {
4988        let task = self.guest_thread.unwrap().task;
4989        self.check_blocking_for(task)
4990    }
4991
4992    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4993        if self.may_block(task) {
4994            Ok(())
4995        } else {
4996            Err(Trap::CannotBlockSyncTask.into())
4997        }
4998    }
4999
5000    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5001        let task = self.get_mut(task).unwrap();
5002        task.async_function || task.returned_or_cancelled()
5003    }
5004}
5005
5006/// Provide a type hint to compiler about the shape of a parameter lower
5007/// closure.
5008fn for_any_lower<
5009    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5010>(
5011    fun: F,
5012) -> F {
5013    fun
5014}
5015
5016/// Provide a type hint to compiler about the shape of a result lift closure.
5017fn for_any_lift<
5018    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5019>(
5020    fun: F,
5021) -> F {
5022    fun
5023}
5024
5025/// Wrap the specified future in a `poll_fn` which asserts that the future is
5026/// only polled from the event loop of the specified `Store`.
5027///
5028/// See `StoreContextMut::run_concurrent` for details.
5029fn checked<F: Future + Send + 'static>(
5030    id: StoreId,
5031    fut: F,
5032) -> impl Future<Output = F::Output> + Send + 'static {
5033    async move {
5034        let mut fut = pin!(fut);
5035        future::poll_fn(move |cx| {
5036            let message = "\
5037                `Future`s which depend on asynchronous component tasks, streams, or \
5038                futures to complete may only be polled from the event loop of the \
5039                store to which they belong.  Please use \
5040                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5041            ";
5042            tls::try_get(|store| {
5043                let matched = match store {
5044                    tls::TryGet::Some(store) => store.id() == id,
5045                    tls::TryGet::Taken | tls::TryGet::None => false,
5046                };
5047
5048                if !matched {
5049                    panic!("{message}")
5050                }
5051            });
5052            fut.as_mut().poll(cx)
5053        })
5054        .await
5055    }
5056}
5057
5058/// Assert that `StoreContextMut::run_concurrent` has not been called from
5059/// within an store's event loop.
5060fn check_recursive_run() {
5061    tls::try_get(|store| {
5062        if !matches!(store, tls::TryGet::None) {
5063            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5064        }
5065    });
5066}
5067
5068fn unpack_callback_code(code: u32) -> (u32, u32) {
5069    (code & 0xF, code >> 4)
5070}
5071
5072/// Helper struct for packaging parameters to be passed to
5073/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5074/// `waitable-set.poll`.
5075struct WaitableCheckParams {
5076    set: TableId<WaitableSet>,
5077    options: OptionsIndex,
5078    payload: u32,
5079}
5080
5081/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
5082enum WaitableCheck {
5083    Wait(WaitableCheckParams),
5084    Poll(WaitableCheckParams),
5085}
5086
5087/// Represents a guest task called from the host, prepared using `prepare_call`.
5088pub(crate) struct PreparedCall<R> {
5089    /// The guest export to be called
5090    handle: Func,
5091    /// The guest thread created by `prepare_call`
5092    thread: QualifiedThreadId,
5093    /// The number of lowered core Wasm parameters to pass to the call.
5094    param_count: usize,
5095    /// The `oneshot::Receiver` to which the result of the call will be
5096    /// delivered when it is available.
5097    rx: oneshot::Receiver<LiftedResult>,
5098    /// The `oneshot::Receiver` which will resolve when the task -- and any
5099    /// transitive subtasks -- have all exited.
5100    exit_rx: oneshot::Receiver<()>,
5101    _phantom: PhantomData<R>,
5102}
5103
5104impl<R> PreparedCall<R> {
5105    /// Get a copy of the `TaskId` for this `PreparedCall`.
5106    pub(crate) fn task_id(&self) -> TaskId {
5107        TaskId {
5108            task: self.thread.task,
5109        }
5110    }
5111}
5112
5113/// Represents a task created by `prepare_call`.
5114pub(crate) struct TaskId {
5115    task: TableId<GuestTask>,
5116}
5117
5118impl TaskId {
5119    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5120    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5121    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5122    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5123    /// and delete the task when all threads are done.
5124    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5125        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5126        if !task.already_lowered_parameters() {
5127            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5128        } else {
5129            task.host_future_state = HostFutureState::Dropped;
5130            if task.ready_to_delete() {
5131                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5132            }
5133        }
5134        Ok(())
5135    }
5136}
5137
5138/// Prepare a call to the specified exported Wasm function, providing functions
5139/// for lowering the parameters and lifting the result.
5140///
5141/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5142/// loop, use `queue_call`.
5143pub(crate) fn prepare_call<T, R>(
5144    mut store: StoreContextMut<T>,
5145    handle: Func,
5146    param_count: usize,
5147    host_future_present: bool,
5148    call_post_return_automatically: bool,
5149    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5150    + Send
5151    + Sync
5152    + 'static,
5153    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5154    + Send
5155    + Sync
5156    + 'static,
5157) -> Result<PreparedCall<R>> {
5158    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5159
5160    let instance = handle.instance().id().get(store.0);
5161    let options = &instance.component().env_component().options[options];
5162    let ty = &instance.component().types()[ty];
5163    let async_function = ty.async_;
5164    let task_return_type = ty.results;
5165    let component_instance = raw_options.instance;
5166    let callback = options.callback.map(|i| instance.runtime_callback(i));
5167    let memory = options
5168        .memory()
5169        .map(|i| instance.runtime_memory(i))
5170        .map(SendSyncPtr::new);
5171    let string_encoding = options.string_encoding;
5172    let token = StoreToken::new(store.as_context_mut());
5173    let state = store.0.concurrent_state_mut();
5174
5175    let (tx, rx) = oneshot::channel();
5176    let (exit_tx, exit_rx) = oneshot::channel();
5177
5178    let mut task = GuestTask::new(
5179        state,
5180        Box::new(for_any_lower(move |store, params| {
5181            lower_params(handle, token.as_context_mut(store), params)
5182        })),
5183        LiftResult {
5184            lift: Box::new(for_any_lift(move |store, result| {
5185                lift_result(handle, store, result)
5186            })),
5187            ty: task_return_type,
5188            memory,
5189            string_encoding,
5190        },
5191        Caller::Host {
5192            tx: Some(tx),
5193            exit_tx: Arc::new(exit_tx),
5194            host_future_present,
5195            call_post_return_automatically,
5196        },
5197        callback.map(|callback| {
5198            let callback = SendSyncPtr::new(callback);
5199            let instance = handle.instance();
5200            Box::new(
5201                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5202                    let store = token.as_context_mut(store);
5203                    // SAFETY: Per the contract of `prepare_call`, the callback
5204                    // will remain valid at least as long is this task exists.
5205                    unsafe {
5206                        instance.call_callback(
5207                            store,
5208                            runtime_instance,
5209                            callback,
5210                            event,
5211                            handle,
5212                            call_post_return_automatically,
5213                        )
5214                    }
5215                },
5216            ) as CallbackFn
5217        }),
5218        handle.instance(),
5219        component_instance,
5220        async_function,
5221    )?;
5222    task.function_index = Some(handle.index());
5223
5224    let task = state.push(task)?;
5225    let thread = state.push(GuestThread::new_implicit(task))?;
5226    state.get_mut(task)?.threads.insert(thread);
5227
5228    Ok(PreparedCall {
5229        handle,
5230        thread: QualifiedThreadId { task, thread },
5231        param_count,
5232        rx,
5233        exit_rx,
5234        _phantom: PhantomData,
5235    })
5236}
5237
5238/// Queue a call previously prepared using `prepare_call` to be run as part of
5239/// the associated `ComponentInstance`'s event loop.
5240///
5241/// The returned future will resolve to the result once it is available, but
5242/// must only be polled via the instance's event loop. See
5243/// `StoreContextMut::run_concurrent` for details.
5244pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5245    mut store: StoreContextMut<T>,
5246    prepared: PreparedCall<R>,
5247) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5248    let PreparedCall {
5249        handle,
5250        thread,
5251        param_count,
5252        rx,
5253        exit_rx,
5254        ..
5255    } = prepared;
5256
5257    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5258
5259    Ok(checked(
5260        store.0.id(),
5261        rx.map(move |result| {
5262            result
5263                .map(|v| (*v.downcast().unwrap(), exit_rx))
5264                .map_err(anyhow::Error::from)
5265        }),
5266    ))
5267}
5268
5269/// Queue a call previously prepared using `prepare_call` to be run as part of
5270/// the associated `ComponentInstance`'s event loop.
5271fn queue_call0<T: 'static>(
5272    store: StoreContextMut<T>,
5273    handle: Func,
5274    guest_thread: QualifiedThreadId,
5275    param_count: usize,
5276) -> Result<()> {
5277    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5278    let is_concurrent = raw_options.async_;
5279    let callback = raw_options.callback;
5280    let instance = handle.instance();
5281    let callee = handle.lifted_core_func(store.0);
5282    let post_return = handle.post_return_core_func(store.0);
5283    let callback = callback.map(|i| {
5284        let instance = instance.id().get(store.0);
5285        SendSyncPtr::new(instance.runtime_callback(i))
5286    });
5287
5288    log::trace!("queueing call {guest_thread:?}");
5289
5290    let instance_flags = if callback.is_none() {
5291        None
5292    } else {
5293        Some(flags)
5294    };
5295
5296    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5297    // (with signatures appropriate for this call) and will remain valid as
5298    // long as this instance is valid.
5299    unsafe {
5300        instance.queue_call(
5301            store,
5302            guest_thread,
5303            SendSyncPtr::new(callee),
5304            param_count,
5305            1,
5306            instance_flags,
5307            is_concurrent,
5308            callback,
5309            post_return.map(SendSyncPtr::new),
5310        )
5311    }
5312}