Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState, ResourceTables};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64    bail,
65    error::{Context as _, format_err},
66};
67use error_contexts::GlobalErrorContextRefCount;
68use futures::channel::oneshot;
69use futures::future::{self, FutureExt};
70use futures::stream::{FuturesUnordered, StreamExt};
71use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
72use std::any::Any;
73use std::borrow::ToOwned;
74use std::boxed::Box;
75use std::cell::UnsafeCell;
76use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
77use std::fmt;
78use std::future::Future;
79use std::marker::PhantomData;
80use std::mem::{self, ManuallyDrop, MaybeUninit};
81use std::ops::DerefMut;
82use std::pin::{Pin, pin};
83use std::ptr::{self, NonNull};
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97
98pub use abort::JoinHandle;
99pub use future_stream_any::{FutureAny, StreamAny};
100pub use futures_and_streams::{
101    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
102    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
103    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
104};
105pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
106
107mod abort;
108mod error_contexts;
109mod future_stream_any;
110mod futures_and_streams;
111pub(crate) mod table;
112pub(crate) mod tls;
113
114/// Constant defined in the Component Model spec to indicate that the async
115/// intrinsic (e.g. `future.write`) has not yet completed.
116const BLOCKED: u32 = 0xffff_ffff;
117
118/// Corresponds to `CallState` in the upstream spec.
119#[derive(Clone, Copy, Eq, PartialEq, Debug)]
120pub enum Status {
121    Starting = 0,
122    Started = 1,
123    Returned = 2,
124    StartCancelled = 3,
125    ReturnCancelled = 4,
126}
127
128impl Status {
129    /// Packs this status and the optional `waitable` provided into a 32-bit
130    /// result that the canonical ABI requires.
131    ///
132    /// The low 4 bits are reserved for the status while the upper 28 bits are
133    /// the waitable, if present.
134    pub fn pack(self, waitable: Option<u32>) -> u32 {
135        assert!(matches!(self, Status::Returned) == waitable.is_none());
136        let waitable = waitable.unwrap_or(0);
137        assert!(waitable < (1 << 28));
138        (waitable << 4) | (self as u32)
139    }
140}
141
142/// Corresponds to `EventCode` in the Component Model spec, plus related payload
143/// data.
144#[derive(Clone, Copy, Debug)]
145enum Event {
146    None,
147    Cancelled,
148    Subtask {
149        status: Status,
150    },
151    StreamRead {
152        code: ReturnCode,
153        pending: Option<(TypeStreamTableIndex, u32)>,
154    },
155    StreamWrite {
156        code: ReturnCode,
157        pending: Option<(TypeStreamTableIndex, u32)>,
158    },
159    FutureRead {
160        code: ReturnCode,
161        pending: Option<(TypeFutureTableIndex, u32)>,
162    },
163    FutureWrite {
164        code: ReturnCode,
165        pending: Option<(TypeFutureTableIndex, u32)>,
166    },
167}
168
169impl Event {
170    /// Lower this event to core Wasm integers for delivery to the guest.
171    ///
172    /// Note that the waitable handle, if any, is assumed to be lowered
173    /// separately.
174    fn parts(self) -> (u32, u32) {
175        const EVENT_NONE: u32 = 0;
176        const EVENT_SUBTASK: u32 = 1;
177        const EVENT_STREAM_READ: u32 = 2;
178        const EVENT_STREAM_WRITE: u32 = 3;
179        const EVENT_FUTURE_READ: u32 = 4;
180        const EVENT_FUTURE_WRITE: u32 = 5;
181        const EVENT_CANCELLED: u32 = 6;
182        match self {
183            Event::None => (EVENT_NONE, 0),
184            Event::Cancelled => (EVENT_CANCELLED, 0),
185            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
186            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
187            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
188            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
189            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
190        }
191    }
192}
193
194/// Corresponds to `CallbackCode` in the spec.
195mod callback_code {
196    pub const EXIT: u32 = 0;
197    pub const YIELD: u32 = 1;
198    pub const WAIT: u32 = 2;
199}
200
201/// A flag indicating that the callee is an async-lowered export.
202///
203/// This may be passed to the `async-start` intrinsic from a fused adapter.
204const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
205
206/// Provides access to either store data (via the `get` method) or the store
207/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
208/// instance to which the current host task belongs.
209///
210/// See [`Accessor::with`] for details.
211pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
212    store: StoreContextMut<'a, T>,
213    get_data: fn(&mut T) -> D::Data<'_>,
214}
215
216impl<'a, T, D> Access<'a, T, D>
217where
218    D: HasData + ?Sized,
219    T: 'static,
220{
221    /// Creates a new [`Access`] from its component parts.
222    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
223        Self { store, get_data }
224    }
225
226    /// Get mutable access to the store data.
227    pub fn data_mut(&mut self) -> &mut T {
228        self.store.data_mut()
229    }
230
231    /// Get mutable access to the store data.
232    pub fn get(&mut self) -> D::Data<'_> {
233        (self.get_data)(self.data_mut())
234    }
235
236    /// Spawn a background task.
237    ///
238    /// See [`Accessor::spawn`] for details.
239    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
240    where
241        T: 'static,
242    {
243        let accessor = Accessor {
244            get_data: self.get_data,
245            token: StoreToken::new(self.store.as_context_mut()),
246        };
247        self.store
248            .as_context_mut()
249            .spawn_with_accessor(accessor, task)
250    }
251
252    /// Returns the getter this accessor is using to project from `T` into
253    /// `D::Data`.
254    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
255        self.get_data
256    }
257}
258
259impl<'a, T, D> AsContext for Access<'a, T, D>
260where
261    D: HasData + ?Sized,
262    T: 'static,
263{
264    type Data = T;
265
266    fn as_context(&self) -> StoreContext<'_, T> {
267        self.store.as_context()
268    }
269}
270
271impl<'a, T, D> AsContextMut for Access<'a, T, D>
272where
273    D: HasData + ?Sized,
274    T: 'static,
275{
276    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
277        self.store.as_context_mut()
278    }
279}
280
281/// Provides scoped mutable access to store data in the context of a concurrent
282/// host task future.
283///
284/// This allows multiple host task futures to execute concurrently and access
285/// the store between (but not across) `await` points.
286///
287/// # Rationale
288///
289/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
290/// `D::Data<'_>`. The problem this is solving, however, is that it does not
291/// literally store these values. The basic problem is that when a concurrent
292/// host future is being polled it has access to `&mut T` (and the whole
293/// `Store`) but when it's not being polled it does not have access to these
294/// values. This reflects how the store is only ever polling one future at a
295/// time so the store is effectively being passed between futures.
296///
297/// Rust's `Future` trait, however, has no means of passing a `Store`
298/// temporarily between futures. The [`Context`](std::task::Context) type does
299/// not have the ability to attach arbitrary information to it at this time.
300/// This type, [`Accessor`], is used to bridge this expressivity gap.
301///
302/// The [`Accessor`] type here represents the ability to acquire, temporarily in
303/// a synchronous manner, the current store. The [`Accessor::with`] function
304/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
305/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
306/// not take an `async` closure as its argument, instead it's a synchronous
307/// closure which must complete during on run of `Future::poll`. This reflects
308/// how the store is temporarily made available while a host future is being
309/// polled.
310///
311/// # Implementation
312///
313/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
314/// this type additionally doesn't even have a lifetime parameter. This is
315/// instead a representation of proof of the ability to acquire these while a
316/// future is being polled. Wasmtime will, when it polls a host future,
317/// configure ambient state such that the `Accessor` that a future closes over
318/// will work and be able to access the store.
319///
320/// This has a number of implications for users such as:
321///
322/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
323///   the lifetime of a single future.
324/// * A future is expected to, however, close over an `Accessor` and keep it
325///   alive probably for the duration of the entire future.
326/// * Different host futures will be given different `Accessor`s, and that's
327///   intentional.
328/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
329///   alleviates some otherwise required bounds to be written down.
330///
331/// # Using `Accessor` in `Drop`
332///
333/// The methods on `Accessor` are only expected to work in the context of
334/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
335/// host future can be dropped at any time throughout the system and Wasmtime
336/// store context is not necessarily available at that time. It's recommended to
337/// not use `Accessor` methods in anything connected to a `Drop` implementation
338/// as they will panic and have unintended results. If you run into this though
339/// feel free to file an issue on the Wasmtime repository.
340pub struct Accessor<T: 'static, D = HasSelf<T>>
341where
342    D: HasData + ?Sized,
343{
344    token: StoreToken<T>,
345    get_data: fn(&mut T) -> D::Data<'_>,
346}
347
348/// A helper trait to take any type of accessor-with-data in functions.
349///
350/// This trait is similar to [`AsContextMut`] except that it's used when
351/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
352/// [`Accessor`] is the main type used in concurrent settings and is passed to
353/// functions such as [`Func::call_concurrent`].
354///
355/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
356/// this trait. This effectively means that regardless of the `D` in
357/// `Accessor<T, D>` it can still be passed to a function which just needs a
358/// store accessor.
359///
360/// Acquiring an [`Accessor`] can be done through
361/// [`StoreContextMut::run_concurrent`] for example or in a host function
362/// through
363/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
364pub trait AsAccessor {
365    /// The `T` in `Store<T>` that this accessor refers to.
366    type Data: 'static;
367
368    /// The `D` in `Accessor<T, D>`, or the projection out of
369    /// `Self::Data`.
370    type AccessorData: HasData + ?Sized;
371
372    /// Returns the accessor that this is referring to.
373    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
374}
375
376impl<T: AsAccessor + ?Sized> AsAccessor for &T {
377    type Data = T::Data;
378    type AccessorData = T::AccessorData;
379
380    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
381        T::as_accessor(self)
382    }
383}
384
385impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
386    type Data = T;
387    type AccessorData = D;
388
389    fn as_accessor(&self) -> &Accessor<T, D> {
390        self
391    }
392}
393
394// Note that it is intentional at this time that `Accessor` does not actually
395// store `&mut T` or anything similar. This distinctly enables the `Accessor`
396// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
397// that matter). This is used to ergonomically simplify bindings where the
398// majority of the time `Accessor` is closed over in a future which then needs
399// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
400// you already have to write `T: 'static`...) it helps to avoid this.
401//
402// Note as well that `Accessor` doesn't actually store its data at all. Instead
403// it's more of a "proof" of what can be accessed from TLS. API design around
404// `Accessor` and functions like `Linker::func_wrap_concurrent` are
405// intentionally made to ensure that `Accessor` is ideally only used in the
406// context that TLS variables are actually set. For example host functions are
407// given `&Accessor`, not `Accessor`, and this prevents them from persisting
408// the value outside of a future. Within the future the TLS variables are all
409// guaranteed to be set while the future is being polled.
410//
411// Finally though this is not an ironclad guarantee, but nor does it need to be.
412// The TLS APIs are designed to panic or otherwise model usage where they're
413// called recursively or similar. It's hoped that code cannot be constructed to
414// actually hit this at runtime but this is not a safety requirement at this
415// time.
416const _: () = {
417    const fn assert<T: Send + Sync>() {}
418    assert::<Accessor<UnsafeCell<u32>>>();
419};
420
421impl<T> Accessor<T> {
422    /// Creates a new `Accessor` backed by the specified functions.
423    ///
424    /// - `get`: used to retrieve the store
425    ///
426    /// - `get_data`: used to "project" from the store's associated data to
427    /// another type (e.g. a field of that data or a wrapper around it).
428    ///
429    /// - `spawn`: used to queue spawned background tasks to be run later
430    pub(crate) fn new(token: StoreToken<T>) -> Self {
431        Self {
432            token,
433            get_data: |x| x,
434        }
435    }
436}
437
438impl<T, D> Accessor<T, D>
439where
440    D: HasData + ?Sized,
441{
442    /// Run the specified closure, passing it mutable access to the store.
443    ///
444    /// This function is one of the main building blocks of the [`Accessor`]
445    /// type. This yields synchronous, blocking, access to the store via an
446    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
447    /// providing the ability to access `D` via [`Access::get`]. Note that the
448    /// `fun` here is given only temporary access to the store and `T`/`D`
449    /// meaning that the return value `R` here is not allowed to capture borrows
450    /// into the two. If access is needed to data within `T` or `D` outside of
451    /// this closure then it must be `clone`d out, for example.
452    ///
453    /// # Panics
454    ///
455    /// This function will panic if it is call recursively with any other
456    /// accessor already in scope. For example if `with` is called within `fun`,
457    /// then this function will panic. It is up to the embedder to ensure that
458    /// this does not happen.
459    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
460        tls::get(|vmstore| {
461            fun(Access {
462                store: self.token.as_context_mut(vmstore),
463                get_data: self.get_data,
464            })
465        })
466    }
467
468    /// Returns the getter this accessor is using to project from `T` into
469    /// `D::Data`.
470    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
471        self.get_data
472    }
473
474    /// Changes this accessor to access `D2` instead of the current type
475    /// parameter `D`.
476    ///
477    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
478    ///
479    /// # Panics
480    ///
481    /// When using this API the returned value is disconnected from `&self` and
482    /// the lifetime binding the `self` argument. An `Accessor` only works
483    /// within the context of the closure or async closure that it was
484    /// originally given to, however. This means that due to the fact that the
485    /// returned value has no lifetime connection it's possible to use the
486    /// accessor outside of `&self`, the original accessor, and panic.
487    ///
488    /// The returned value should only be used within the scope of the original
489    /// `Accessor` that `self` refers to.
490    pub fn with_getter<D2: HasData>(
491        &self,
492        get_data: fn(&mut T) -> D2::Data<'_>,
493    ) -> Accessor<T, D2> {
494        Accessor {
495            token: self.token,
496            get_data,
497        }
498    }
499
500    /// Spawn a background task which will receive an `&Accessor<T, D>` and
501    /// run concurrently with any other tasks in progress for the current
502    /// store.
503    ///
504    /// This is particularly useful for host functions which return a `stream`
505    /// or `future` such that the code to write to the write end of that
506    /// `stream` or `future` must run after the function returns.
507    ///
508    /// The returned [`JoinHandle`] may be used to cancel the task.
509    ///
510    /// # Panics
511    ///
512    /// Panics if called within a closure provided to the [`Accessor::with`]
513    /// function. This can only be called outside an active invocation of
514    /// [`Accessor::with`].
515    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
516    where
517        T: 'static,
518    {
519        let accessor = self.clone_for_spawn();
520        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
521    }
522
523    fn clone_for_spawn(&self) -> Self {
524        Self {
525            token: self.token,
526            get_data: self.get_data,
527        }
528    }
529}
530
531/// Represents a task which may be provided to `Accessor::spawn`,
532/// `Accessor::forward`, or `StorecContextMut::spawn`.
533// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
534// option.
535//
536// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
537// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
538// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
539// Send + Sync + 'static` fails with a type mismatch error when we try to pass
540// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
541// best we can do for the time being.
542pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
543where
544    D: HasData + ?Sized,
545{
546    /// Run the task.
547    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
548}
549
550/// Represents parameter and result metadata for the caller side of a
551/// guest->guest call orchestrated by a fused adapter.
552enum CallerInfo {
553    /// Metadata for a call to an async-lowered import
554    Async {
555        params: Vec<ValRaw>,
556        has_result: bool,
557    },
558    /// Metadata for a call to an sync-lowered import
559    Sync {
560        params: Vec<ValRaw>,
561        result_count: u32,
562    },
563}
564
565/// Indicates how a guest task is waiting on a waitable set.
566enum WaitMode {
567    /// The guest task is waiting using `task.wait`
568    Fiber(StoreFiber<'static>),
569    /// The guest task is waiting via a callback declared as part of an
570    /// async-lifted export.
571    Callback(Instance),
572}
573
574/// Represents the reason a fiber is suspending itself.
575#[derive(Debug)]
576enum SuspendReason {
577    /// The fiber is waiting for an event to be delivered to the specified
578    /// waitable set or task.
579    Waiting {
580        set: TableId<WaitableSet>,
581        thread: QualifiedThreadId,
582        skip_may_block_check: bool,
583    },
584    /// The fiber has finished handling its most recent work item and is waiting
585    /// for another (or to be dropped if it is no longer needed).
586    NeedWork,
587    /// The fiber is yielding and should be resumed once other tasks have had a
588    /// chance to run.
589    Yielding {
590        thread: QualifiedThreadId,
591        skip_may_block_check: bool,
592    },
593    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
594    ExplicitlySuspending {
595        thread: QualifiedThreadId,
596        skip_may_block_check: bool,
597    },
598}
599
600/// Represents a pending call into guest code for a given guest task.
601enum GuestCallKind {
602    /// Indicates there's an event to deliver to the task, possibly related to a
603    /// waitable set the task has been waiting on or polling.
604    DeliverEvent {
605        /// The instance to which the task belongs.
606        instance: Instance,
607        /// The waitable set the event belongs to, if any.
608        ///
609        /// If this is `None` the event will be waiting in the
610        /// `GuestTask::event` field for the task.
611        set: Option<TableId<WaitableSet>>,
612    },
613    /// Indicates that a new guest task call is pending and may be executed
614    /// using the specified closure.
615    ///
616    /// If the closure returns `Ok(Some(call))`, the `call` should be run
617    /// immediately using `handle_guest_call`.
618    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
619    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
620}
621
622impl fmt::Debug for GuestCallKind {
623    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
624        match self {
625            Self::DeliverEvent { instance, set } => f
626                .debug_struct("DeliverEvent")
627                .field("instance", instance)
628                .field("set", set)
629                .finish(),
630            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
631            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
632        }
633    }
634}
635
636/// Represents a pending call into guest code for a given guest thread.
637#[derive(Debug)]
638struct GuestCall {
639    thread: QualifiedThreadId,
640    kind: GuestCallKind,
641}
642
643impl GuestCall {
644    /// Returns whether or not the call is ready to run.
645    ///
646    /// A call will not be ready to run if either:
647    ///
648    /// - the (sub-)component instance to be called has already been entered and
649    /// cannot be reentered until an in-progress call completes
650    ///
651    /// - the call is for a not-yet started task and the (sub-)component
652    /// instance to be called has backpressure enabled
653    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
654        let instance = store
655            .concurrent_state_mut()
656            .get_mut(self.thread.task)?
657            .instance;
658        let state = store.instance_state(instance).concurrent_state();
659
660        let ready = match &self.kind {
661            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
662            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
663            GuestCallKind::StartExplicit(_) => true,
664        };
665        log::trace!(
666            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
667            state.do_not_enter,
668            state.backpressure
669        );
670        Ok(ready)
671    }
672}
673
674/// Job to be run on a worker fiber.
675enum WorkerItem {
676    GuestCall(GuestCall),
677    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
678}
679
680/// Represents a pending work item to be handled by the event loop for a given
681/// component instance.
682enum WorkItem {
683    /// A host task to be pushed to `ConcurrentState::futures`.
684    PushFuture(AlwaysMut<HostTaskFuture>),
685    /// A fiber to resume.
686    ResumeFiber(StoreFiber<'static>),
687    /// A pending call into guest code for a given guest task.
688    GuestCall(GuestCall),
689    /// A job to run on a worker fiber.
690    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
691}
692
693impl fmt::Debug for WorkItem {
694    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
695        match self {
696            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
697            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
698            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
699            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
700        }
701    }
702}
703
704/// Whether a suspension intrinsic was cancelled or completed
705#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
706pub(crate) enum WaitResult {
707    Cancelled,
708    Completed,
709}
710
711/// Poll the specified future until it completes on behalf of a guest->host call
712/// using a sync-lowered import.
713///
714/// This is similar to `Instance::first_poll` except it's for sync-lowered
715/// imports, meaning we don't need to handle cancellation and we can block the
716/// caller until the task completes, at which point the caller can handle
717/// lowering the result to the guest's stack and linear memory.
718pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
719    store: &mut dyn VMStore,
720    future: impl Future<Output = Result<R>> + Send + 'static,
721    caller_instance: RuntimeInstance,
722) -> Result<R> {
723    let state = store.concurrent_state_mut();
724
725    let caller = state.guest_thread.unwrap();
726
727    // Save any existing result stashed in `GuestTask::result` so we can replace
728    // it with the new result.
729    let old_result = state
730        .get_mut(caller.task)
731        .with_context(|| format!("bad handle: {caller:?}"))?
732        .result
733        .take();
734
735    // Add a temporary host task into the table so we can track its progress.
736    // Note that we'll never allocate a waitable handle for the guest since
737    // we're being called synchronously.
738    let task = state.push(HostTask::new(caller_instance, None))?;
739
740    log::trace!("new host task child of {caller:?}: {task:?}");
741
742    // Wrap the future in a closure which will take care of stashing the result
743    // in `GuestTask::result` and resuming this fiber when the host task
744    // completes.
745    let mut future = Box::pin(async move {
746        let result = future.await?;
747        tls::get(move |store| {
748            let state = store.concurrent_state_mut();
749            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
750
751            Waitable::Host(task).set_event(
752                state,
753                Some(Event::Subtask {
754                    status: Status::Returned,
755                }),
756            )?;
757
758            Ok(())
759        })
760    }) as HostTaskFuture;
761
762    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
763    // add the future to `ConcurrentState::futures` and poll it automatically
764    // from the event loop if it doesn't complete immediately here.
765    let poll = tls::set(store, || {
766        future
767            .as_mut()
768            .poll(&mut Context::from_waker(&Waker::noop()))
769    });
770
771    match poll {
772        Poll::Ready(result) => {
773            // It completed immediately; check the result and delete the task.
774            result?;
775            log::trace!("delete host task {task:?} (already ready)");
776            store.concurrent_state_mut().delete(task)?;
777        }
778        Poll::Pending => {
779            // It did not complete immediately; add it to
780            // `ConcurrentState::futures` so it will be polled via the event
781            // loop; then use `GuestTask::sync_call_set` to wait for the task to
782            // complete, suspending the current fiber until it does so.
783            let state = store.concurrent_state_mut();
784            state.push_future(future);
785
786            let set = state.get_mut(caller.task)?.sync_call_set;
787            Waitable::Host(task).join(state, Some(set))?;
788
789            store.suspend(SuspendReason::Waiting {
790                set,
791                thread: caller,
792                skip_may_block_check: false,
793            })?;
794        }
795    }
796
797    // Retrieve and return the result.
798    Ok(*mem::replace(
799        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
800        old_result,
801    )
802    .unwrap()
803    .downcast()
804    .unwrap())
805}
806
807/// Execute the specified guest call.
808fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
809    let mut next = Some(call);
810    while let Some(call) = next.take() {
811        match call.kind {
812            GuestCallKind::DeliverEvent { instance, set } => {
813                let (event, waitable) = instance
814                    .get_event(store, call.thread.task, set, true)?
815                    .unwrap();
816                let state = store.concurrent_state_mut();
817                let task = state.get_mut(call.thread.task)?;
818                let runtime_instance = task.instance;
819                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
820
821                log::trace!(
822                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
823                    call.thread,
824                );
825
826                let old_thread = store.set_thread(Some(call.thread));
827                log::trace!(
828                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
829                    call.thread
830                );
831
832                store.maybe_push_call_context(call.thread.task)?;
833
834                store.enter_instance(runtime_instance);
835
836                let callback = store
837                    .concurrent_state_mut()
838                    .get_mut(call.thread.task)?
839                    .callback
840                    .take()
841                    .unwrap();
842
843                let code = callback(store, event, handle)?;
844
845                store
846                    .concurrent_state_mut()
847                    .get_mut(call.thread.task)?
848                    .callback = Some(callback);
849
850                store.exit_instance(runtime_instance)?;
851
852                store.maybe_pop_call_context(call.thread.task)?;
853
854                store.set_thread(old_thread);
855
856                next = instance.handle_callback_code(
857                    store,
858                    call.thread,
859                    runtime_instance.index,
860                    code,
861                )?;
862
863                log::trace!(
864                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
865                );
866            }
867            GuestCallKind::StartImplicit(fun) => {
868                next = fun(store)?;
869            }
870            GuestCallKind::StartExplicit(fun) => {
871                fun(store)?;
872            }
873        }
874    }
875
876    Ok(())
877}
878
879impl<T> Store<T> {
880    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
881    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
882    where
883        T: Send + 'static,
884    {
885        ensure!(
886            self.as_context().0.concurrency_support(),
887            "cannot use `run_concurrent` when Config::concurrency_support disabled",
888        );
889        self.as_context_mut().run_concurrent(fun).await
890    }
891
892    #[doc(hidden)]
893    pub fn assert_concurrent_state_empty(&mut self) {
894        self.as_context_mut().assert_concurrent_state_empty();
895    }
896
897    /// Convenience wrapper for [`StoreContextMut::spawn`].
898    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
899    where
900        T: 'static,
901    {
902        self.as_context_mut().spawn(task)
903    }
904}
905
906impl<T> StoreContextMut<'_, T> {
907    /// Assert that all the relevant tables and queues in the concurrent state
908    /// for this store are empty.
909    ///
910    /// This is for sanity checking in integration tests
911    /// (e.g. `component-async-tests`) that the relevant state has been cleared
912    /// after each test concludes.  This should help us catch leaks, e.g. guest
913    /// tasks which haven't been deleted despite having completed and having
914    /// been dropped by their supertasks.
915    #[doc(hidden)]
916    pub fn assert_concurrent_state_empty(self) {
917        let store = self.0;
918        store
919            .store_data_mut()
920            .components
921            .assert_instance_states_empty();
922        let state = store.concurrent_state_mut();
923        assert!(
924            state.table.get_mut().is_empty(),
925            "non-empty table: {:?}",
926            state.table.get_mut()
927        );
928        assert!(state.high_priority.is_empty());
929        assert!(state.low_priority.is_empty());
930        assert!(state.guest_thread.is_none());
931        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
932        assert!(state.global_error_context_ref_counts.is_empty());
933    }
934
935    /// Spawn a background task to run as part of this instance's event loop.
936    ///
937    /// The task will receive an `&Accessor<U>` and run concurrently with
938    /// any other tasks in progress for the instance.
939    ///
940    /// Note that the task will only make progress if and when the event loop
941    /// for this instance is run.
942    ///
943    /// The returned [`JoinHandle`] may be used to cancel the task.
944    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
945    where
946        T: 'static,
947    {
948        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
949        self.spawn_with_accessor(accessor, task)
950    }
951
952    /// Internal implementation of `spawn` functions where a `store` is
953    /// available along with an `Accessor`.
954    fn spawn_with_accessor<D>(
955        self,
956        accessor: Accessor<T, D>,
957        task: impl AccessorTask<T, D>,
958    ) -> JoinHandle
959    where
960        T: 'static,
961        D: HasData + ?Sized,
962    {
963        // Create an "abortable future" here where internally the future will
964        // hook calls to poll and possibly spawn more background tasks on each
965        // iteration.
966        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
967        self.0
968            .concurrent_state_mut()
969            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
970        handle
971    }
972
973    /// Run the specified closure `fun` to completion as part of this store's
974    /// event loop.
975    ///
976    /// This will run `fun` as part of this store's event loop until it
977    /// yields a result.  `fun` is provided an [`Accessor`], which provides
978    /// controlled access to the store and its data.
979    ///
980    /// This function can be used to invoke [`Func::call_concurrent`] for
981    /// example within the async closure provided here.
982    ///
983    /// This function will unconditionally return an error if
984    /// [`Config::concurrency_support`] is disabled.
985    ///
986    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
987    ///
988    /// # Store-blocking behavior
989    ///
990    /// At this time there are certain situations in which the `Future` returned
991    /// by the `AsyncFnOnce` passed to this function will not be polled for an
992    /// extended period of time, despite one or more `Waker::wake` events having
993    /// occurred for the task to which it belongs.  This can manifest as the
994    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
995    /// the `Store` being held by e.g. a blocking host function, preventing the
996    /// `Future` from being polled. A canonical example of this is when the
997    /// `fun` provided to this function attempts to set a timeout for an
998    /// invocation of a wasm function. In this situation the async closure is
999    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1000    /// to elapse. At this time this setup will not always work and the timeout
1001    /// may not reliably fire.
1002    ///
1003    /// This function will not block the current thread and as such is always
1004    /// suitable to run in an `async` context, but the current implementation of
1005    /// Wasmtime can lead to situations where a certain wasm computation is
1006    /// required to make progress the closure to make progress. This is an
1007    /// artifact of Wasmtime's historical implementation of `async` functions
1008    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1009    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1010    /// progress for a readiness notification of (b) to get delivered.
1011    ///
1012    /// This effectively means that it's not possible to reliably perform a
1013    /// "select" operation within the `fun` closure, which timeouts for example
1014    /// are based on. Fixing this requires some relatively major refactoring
1015    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1016    /// that is intended to be fixed one day. In the meantime it's recommended
1017    /// to apply timeouts or such to the entire `run_concurrent` call itself
1018    /// rather than internally.
1019    ///
1020    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1021    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1022    ///
1023    /// # Example
1024    ///
1025    /// ```
1026    /// # use {
1027    /// #   wasmtime::{
1028    /// #     error::{Result},
1029    /// #     component::{ Component, Linker, Resource, ResourceTable},
1030    /// #     Config, Engine, Store
1031    /// #   },
1032    /// # };
1033    /// #
1034    /// # struct MyResource(u32);
1035    /// # struct Ctx { table: ResourceTable }
1036    /// #
1037    /// # async fn foo() -> Result<()> {
1038    /// # let mut config = Config::new();
1039    /// # let engine = Engine::new(&config)?;
1040    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1041    /// # let mut linker = Linker::new(&engine);
1042    /// # let component = Component::new(&engine, "")?;
1043    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1044    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1045    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1046    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1047    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1048    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1049    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1050    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1051    ///    Ok(())
1052    /// }).await??;
1053    /// # Ok(())
1054    /// # }
1055    /// ```
1056    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1057    where
1058        T: Send + 'static,
1059    {
1060        ensure!(
1061            self.0.concurrency_support(),
1062            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1063        );
1064        self.do_run_concurrent(fun, false).await
1065    }
1066
1067    pub(super) async fn run_concurrent_trap_on_idle<R>(
1068        self,
1069        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1070    ) -> Result<R>
1071    where
1072        T: Send + 'static,
1073    {
1074        self.do_run_concurrent(fun, true).await
1075    }
1076
1077    async fn do_run_concurrent<R>(
1078        mut self,
1079        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1080        trap_on_idle: bool,
1081    ) -> Result<R>
1082    where
1083        T: Send + 'static,
1084    {
1085        debug_assert!(self.0.concurrency_support());
1086        check_recursive_run();
1087        let token = StoreToken::new(self.as_context_mut());
1088
1089        struct Dropper<'a, T: 'static, V> {
1090            store: StoreContextMut<'a, T>,
1091            value: ManuallyDrop<V>,
1092        }
1093
1094        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1095            fn drop(&mut self) {
1096                tls::set(self.store.0, || {
1097                    // SAFETY: Here we drop the value without moving it for the
1098                    // first and only time -- per the contract for `Drop::drop`,
1099                    // this code won't run again, and the `value` field will no
1100                    // longer be accessible.
1101                    unsafe { ManuallyDrop::drop(&mut self.value) }
1102                });
1103            }
1104        }
1105
1106        let accessor = &Accessor::new(token);
1107        let dropper = &mut Dropper {
1108            store: self,
1109            value: ManuallyDrop::new(fun(accessor)),
1110        };
1111        // SAFETY: We never move `dropper` nor its `value` field.
1112        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1113
1114        dropper
1115            .store
1116            .as_context_mut()
1117            .poll_until(future, trap_on_idle)
1118            .await
1119    }
1120
1121    /// Run this store's event loop.
1122    ///
1123    /// The returned future will resolve when the specified future completes or,
1124    /// if `trap_on_idle` is true, when the event loop can't make further
1125    /// progress.
1126    async fn poll_until<R>(
1127        mut self,
1128        mut future: Pin<&mut impl Future<Output = R>>,
1129        trap_on_idle: bool,
1130    ) -> Result<R>
1131    where
1132        T: Send + 'static,
1133    {
1134        struct Reset<'a, T: 'static> {
1135            store: StoreContextMut<'a, T>,
1136            futures: Option<FuturesUnordered<HostTaskFuture>>,
1137        }
1138
1139        impl<'a, T> Drop for Reset<'a, T> {
1140            fn drop(&mut self) {
1141                if let Some(futures) = self.futures.take() {
1142                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1143                }
1144            }
1145        }
1146
1147        loop {
1148            // Take `ConcurrentState::futures` out of the store so we can poll
1149            // it while also safely giving any of the futures inside access to
1150            // `self`.
1151            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1152            let mut reset = Reset {
1153                store: self.as_context_mut(),
1154                futures,
1155            };
1156            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1157
1158            enum PollResult<R> {
1159                Complete(R),
1160                ProcessWork(Vec<WorkItem>),
1161            }
1162            let result = future::poll_fn(|cx| {
1163                // First, poll the future we were passed as an argument and
1164                // return immediately if it's ready.
1165                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1166                    return Poll::Ready(Ok(PollResult::Complete(value)));
1167                }
1168
1169                // Next, poll `ConcurrentState::futures` (which includes any
1170                // pending host tasks and/or background tasks), returning
1171                // immediately if one of them fails.
1172                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1173                    Poll::Ready(Some(output)) => {
1174                        match output {
1175                            Err(e) => return Poll::Ready(Err(e)),
1176                            Ok(()) => {}
1177                        }
1178                        Poll::Ready(true)
1179                    }
1180                    Poll::Ready(None) => Poll::Ready(false),
1181                    Poll::Pending => Poll::Pending,
1182                };
1183
1184                // Next, collect the next batch of work items to process, if any.
1185                // This will be either all of the high-priority work items, or if
1186                // there are none, a single low-priority work item.
1187                let state = reset.store.0.concurrent_state_mut();
1188                let ready = state.collect_work_items_to_run();
1189                if !ready.is_empty() {
1190                    return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1191                }
1192
1193                // Finally, if we have nothing else to do right now, determine what to do
1194                // based on whether there are any pending futures in
1195                // `ConcurrentState::futures`.
1196                return match next {
1197                    Poll::Ready(true) => {
1198                        // In this case, one of the futures in
1199                        // `ConcurrentState::futures` completed
1200                        // successfully, so we return now and continue
1201                        // the outer loop in case there is another one
1202                        // ready to complete.
1203                        Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1204                    }
1205                    Poll::Ready(false) => {
1206                        // Poll the future we were passed one last time
1207                        // in case one of `ConcurrentState::futures` had
1208                        // the side effect of unblocking it.
1209                        if let Poll::Ready(value) =
1210                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1211                        {
1212                            Poll::Ready(Ok(PollResult::Complete(value)))
1213                        } else {
1214                            // In this case, there are no more pending
1215                            // futures in `ConcurrentState::futures`,
1216                            // there are no remaining work items, _and_
1217                            // the future we were passed as an argument
1218                            // still hasn't completed.
1219                            if trap_on_idle {
1220                                // `trap_on_idle` is true, so we exit
1221                                // immediately.
1222                                Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1223                            } else {
1224                                // `trap_on_idle` is false, so we assume
1225                                // that future will wake up and give us
1226                                // more work to do when it's ready to.
1227                                Poll::Pending
1228                            }
1229                        }
1230                    }
1231                    // There is at least one pending future in
1232                    // `ConcurrentState::futures` and we have nothing
1233                    // else to do but wait for now, so we return
1234                    // `Pending`.
1235                    Poll::Pending => Poll::Pending,
1236                };
1237            })
1238            .await;
1239
1240            // Put the `ConcurrentState::futures` back into the store before we
1241            // return or handle any work items since one or more of those items
1242            // might append more futures.
1243            drop(reset);
1244
1245            match result? {
1246                // The future we were passed as an argument completed, so we
1247                // return the result.
1248                PollResult::Complete(value) => break Ok(value),
1249                // The future we were passed has not yet completed, so handle
1250                // any work items and then loop again.
1251                PollResult::ProcessWork(ready) => {
1252                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1253                        store: StoreContextMut<'a, T>,
1254                        ready: I,
1255                    }
1256
1257                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1258                        fn drop(&mut self) {
1259                            while let Some(item) = self.ready.next() {
1260                                match item {
1261                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1262                                    WorkItem::PushFuture(future) => {
1263                                        tls::set(self.store.0, move || drop(future))
1264                                    }
1265                                    _ => {}
1266                                }
1267                            }
1268                        }
1269                    }
1270
1271                    let mut dispose = Dispose {
1272                        store: self.as_context_mut(),
1273                        ready: ready.into_iter(),
1274                    };
1275
1276                    while let Some(item) = dispose.ready.next() {
1277                        dispose
1278                            .store
1279                            .as_context_mut()
1280                            .handle_work_item(item)
1281                            .await?;
1282                    }
1283                }
1284            }
1285        }
1286    }
1287
1288    /// Handle the specified work item, possibly resuming a fiber if applicable.
1289    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1290    where
1291        T: Send,
1292    {
1293        log::trace!("handle work item {item:?}");
1294        match item {
1295            WorkItem::PushFuture(future) => {
1296                self.0
1297                    .concurrent_state_mut()
1298                    .futures
1299                    .get_mut()
1300                    .as_mut()
1301                    .unwrap()
1302                    .push(future.into_inner());
1303            }
1304            WorkItem::ResumeFiber(fiber) => {
1305                self.0.resume_fiber(fiber).await?;
1306            }
1307            WorkItem::GuestCall(call) => {
1308                if call.is_ready(self.0)? {
1309                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1310                } else {
1311                    let state = self.0.concurrent_state_mut();
1312                    let task = state.get_mut(call.thread.task)?;
1313                    if !task.starting_sent {
1314                        task.starting_sent = true;
1315                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1316                            Waitable::Guest(call.thread.task).set_event(
1317                                state,
1318                                Some(Event::Subtask {
1319                                    status: Status::Starting,
1320                                }),
1321                            )?;
1322                        }
1323                    }
1324
1325                    let instance = state.get_mut(call.thread.task)?.instance;
1326                    self.0
1327                        .instance_state(instance)
1328                        .concurrent_state()
1329                        .pending
1330                        .insert(call.thread, call.kind);
1331                }
1332            }
1333            WorkItem::WorkerFunction(fun) => {
1334                self.run_on_worker(WorkerItem::Function(fun)).await?;
1335            }
1336        }
1337
1338        Ok(())
1339    }
1340
1341    /// Execute the specified guest call on a worker fiber.
1342    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1343    where
1344        T: Send,
1345    {
1346        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1347            fiber
1348        } else {
1349            fiber::make_fiber(self.0, move |store| {
1350                loop {
1351                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1352                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1353                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1354                    }
1355
1356                    store.suspend(SuspendReason::NeedWork)?;
1357                }
1358            })?
1359        };
1360
1361        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1362        assert!(worker_item.is_none());
1363        *worker_item = Some(item);
1364
1365        self.0.resume_fiber(worker).await
1366    }
1367
1368    /// Wrap the specified host function in a future which will call it, passing
1369    /// it an `&Accessor<T>`.
1370    ///
1371    /// See the `Accessor` documentation for details.
1372    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1373    where
1374        T: 'static,
1375        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1376            + Send
1377            + Sync
1378            + 'static,
1379        R: Send + Sync + 'static,
1380    {
1381        let token = StoreToken::new(self);
1382        async move {
1383            let mut accessor = Accessor::new(token);
1384            closure(&mut accessor).await
1385        }
1386    }
1387}
1388
1389impl StoreOpaque {
1390    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1391    /// guest-to-guest call or a sync host-to-guest call.
1392    ///
1393    /// This task will only be used for the purpose of handling calls to
1394    /// intrinsic functions; both parameter lowering and result lifting are
1395    /// assumed to be taken care of elsewhere.
1396    pub(crate) fn enter_sync_call(
1397        &mut self,
1398        guest_caller: Option<RuntimeInstance>,
1399        callee_async: bool,
1400        callee: RuntimeInstance,
1401    ) -> Result<()> {
1402        log::trace!("enter sync call {callee:?}");
1403
1404        let state = self.concurrent_state_mut();
1405        let thread = state.guest_thread;
1406        let instance = if let Some(thread) = thread {
1407            Some(state.get_mut(thread.task)?.instance)
1408        } else {
1409            None
1410        };
1411        let task = GuestTask::new(
1412            state,
1413            Box::new(move |_, _| unreachable!()),
1414            LiftResult {
1415                lift: Box::new(move |_, _| unreachable!()),
1416                ty: TypeTupleIndex::reserved_value(),
1417                memory: None,
1418                string_encoding: StringEncoding::Utf8,
1419            },
1420            if let Some(caller) = guest_caller {
1421                assert_eq!(caller, instance.unwrap());
1422                Caller::Guest {
1423                    thread: thread.unwrap(),
1424                }
1425            } else {
1426                Caller::Host {
1427                    tx: None,
1428                    exit_tx: Arc::new(oneshot::channel().0),
1429                    host_future_present: false,
1430                    caller: state.guest_thread,
1431                }
1432            },
1433            None,
1434            callee,
1435            callee_async,
1436        )?;
1437
1438        let guest_task = state.push(task)?;
1439        let new_thread = GuestThread::new_implicit(guest_task);
1440        let guest_thread = state.push(new_thread)?;
1441        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1442            guest_thread,
1443            self,
1444            callee.index,
1445        )?;
1446
1447        let state = self.concurrent_state_mut();
1448        state.get_mut(guest_task)?.threads.insert(guest_thread);
1449        if guest_caller.is_some() {
1450            let thread = state.guest_thread.unwrap();
1451            state.get_mut(thread.task)?.subtasks.insert(guest_task);
1452        }
1453
1454        self.set_thread(Some(QualifiedThreadId {
1455            task: guest_task,
1456            thread: guest_thread,
1457        }));
1458
1459        Ok(())
1460    }
1461
1462    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1463    pub(crate) fn exit_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1464        let thread = self.set_thread(None).unwrap();
1465        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1466        log::trace!("exit sync call {instance:?}");
1467        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1468            self,
1469            thread,
1470            instance.index,
1471        )?;
1472
1473        let state = self.concurrent_state_mut();
1474        let task = state.get_mut(thread.task)?;
1475        let caller = match &task.caller {
1476            &Caller::Guest { thread } => {
1477                assert!(guest_caller);
1478                Some(thread)
1479            }
1480            &Caller::Host { caller, .. } => {
1481                assert!(!guest_caller);
1482                caller
1483            }
1484        };
1485        self.set_thread(caller);
1486
1487        let state = self.concurrent_state_mut();
1488        let task = state.get_mut(thread.task)?;
1489        if task.ready_to_delete() {
1490            state.delete(thread.task)?.dispose(state, thread.task)?;
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Determine whether the specified instance may be entered from the host.
1497    ///
1498    /// We return `true` here only if all of the following hold:
1499    ///
1500    /// - The top-level instance is not already on the current task's call stack.
1501    /// - The instance is not in need of a post-return function call.
1502    /// - `self` has not been poisoned due to a trap.
1503    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1504        if !self.concurrency_support() {
1505            return !self.trapped();
1506        }
1507        let state = self.concurrent_state_mut();
1508        if let Some(caller) = state.guest_thread {
1509            instance != state.get_mut(caller.task).unwrap().instance
1510                && self.may_enter_from_caller(caller.task, instance)
1511        } else {
1512            !self.trapped()
1513        }
1514    }
1515
1516    /// Variation of `may_enter` which takes a `TableId<GuestTask>` representing
1517    /// the callee.
1518    fn may_enter_task(&mut self, task: TableId<GuestTask>) -> bool {
1519        let instance = self.concurrent_state_mut().get_mut(task).unwrap().instance;
1520        self.may_enter_from_caller(task, instance)
1521    }
1522
1523    /// Variation of `may_enter` which takes a `TableId<GuestTask>` representing
1524    /// the caller, plus a `RuntimeInstance` representing the callee.
1525    fn may_enter_from_caller(
1526        &mut self,
1527        mut guest_task: TableId<GuestTask>,
1528        instance: RuntimeInstance,
1529    ) -> bool {
1530        !self.trapped() && {
1531            let state = self.concurrent_state_mut();
1532            let guest_instance = instance.instance;
1533            loop {
1534                // Note that we only compare top-level instance IDs here.  The
1535                // idea is that the host is not allowed to recursively enter a
1536                // top-level instance even if the specific leaf instance is not
1537                // on the stack.  This the behavior defined in the spec, and it
1538                // allows us to elide runtime checks in guest-to-guest adapters.
1539                let next_thread = match &state.get_mut(guest_task).unwrap().caller {
1540                    Caller::Host { caller: None, .. } => break true,
1541                    &Caller::Host {
1542                        caller: Some(caller),
1543                        ..
1544                    } => {
1545                        let instance = state.get_mut(caller.task).unwrap().instance;
1546                        if instance.instance == guest_instance {
1547                            break false;
1548                        } else {
1549                            caller
1550                        }
1551                    }
1552                    &Caller::Guest { thread } => {
1553                        if state.get_mut(thread.task).unwrap().instance.instance == guest_instance {
1554                            break false;
1555                        } else {
1556                            thread
1557                        }
1558                    }
1559                };
1560                guest_task = next_thread.task;
1561            }
1562        }
1563    }
1564
1565    /// Helper function to retrieve the `InstanceState` for the
1566    /// specified instance.
1567    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1568        self.component_instance_mut(instance.instance)
1569            .instance_state(instance.index)
1570    }
1571
1572    fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1573        // Each time we switch threads, we conservatively set `task_may_block`
1574        // to `false` for the component instance we're switching away from (if
1575        // any), meaning it will be `false` for any new thread created for that
1576        // instance unless explicitly set otherwise.
1577        let state = self.concurrent_state_mut();
1578        let old_thread = state.guest_thread.take();
1579        if let Some(old_thread) = old_thread {
1580            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1581            self.component_instance_mut(instance)
1582                .set_task_may_block(false)
1583        }
1584
1585        self.concurrent_state_mut().guest_thread = thread;
1586
1587        // If we're switching to a new thread, set its component instance's
1588        // `task_may_block` according to where it left off.
1589        if thread.is_some() {
1590            self.set_task_may_block();
1591        }
1592
1593        old_thread
1594    }
1595
1596    /// Set the global variable representing whether the current task may block
1597    /// prior to entering Wasm code.
1598    fn set_task_may_block(&mut self) {
1599        let state = self.concurrent_state_mut();
1600        let guest_thread = state.guest_thread.unwrap();
1601        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1602        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1603        self.component_instance_mut(instance)
1604            .set_task_may_block(may_block)
1605    }
1606
1607    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1608        if !self.concurrency_support() {
1609            return Ok(());
1610        }
1611        let state = self.concurrent_state_mut();
1612        let task = state.guest_thread.unwrap().task;
1613        let instance = state.get_mut(task).unwrap().instance.instance;
1614        let task_may_block = self.component_instance(instance).get_task_may_block();
1615
1616        if task_may_block {
1617            Ok(())
1618        } else {
1619            Err(Trap::CannotBlockSyncTask.into())
1620        }
1621    }
1622
1623    /// Record that we're about to enter a (sub-)component instance which does
1624    /// not support more than one concurrent, stackful activation, meaning it
1625    /// cannot be entered again until the next call returns.
1626    fn enter_instance(&mut self, instance: RuntimeInstance) {
1627        log::trace!("enter {instance:?}");
1628        self.instance_state(instance)
1629            .concurrent_state()
1630            .do_not_enter = true;
1631    }
1632
1633    /// Record that we've exited a (sub-)component instance previously entered
1634    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1635    /// See the documentation for the latter for details.
1636    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1637        log::trace!("exit {instance:?}");
1638        self.instance_state(instance)
1639            .concurrent_state()
1640            .do_not_enter = false;
1641        self.partition_pending(instance)
1642    }
1643
1644    /// Iterate over `InstanceState::pending`, moving any ready items into the
1645    /// "high priority" work item queue.
1646    ///
1647    /// See `GuestCall::is_ready` for details.
1648    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1649        for (thread, kind) in
1650            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1651        {
1652            let call = GuestCall { thread, kind };
1653            if call.is_ready(self)? {
1654                self.concurrent_state_mut()
1655                    .push_high_priority(WorkItem::GuestCall(call));
1656            } else {
1657                self.instance_state(instance)
1658                    .concurrent_state()
1659                    .pending
1660                    .insert(call.thread, call.kind);
1661            }
1662        }
1663
1664        Ok(())
1665    }
1666
1667    /// Implements the `backpressure.{inc,dec}` intrinsics.
1668    pub(crate) fn backpressure_modify(
1669        &mut self,
1670        caller_instance: RuntimeInstance,
1671        modify: impl FnOnce(u16) -> Option<u16>,
1672    ) -> Result<()> {
1673        let state = self.instance_state(caller_instance).concurrent_state();
1674        let old = state.backpressure;
1675        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1676        state.backpressure = new;
1677
1678        if old > 0 && new == 0 {
1679            // Backpressure was previously enabled and is now disabled; move any
1680            // newly-eligible guest calls to the "high priority" queue.
1681            self.partition_pending(caller_instance)?;
1682        }
1683
1684        Ok(())
1685    }
1686
1687    /// Resume the specified fiber, giving it exclusive access to the specified
1688    /// store.
1689    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1690        let old_thread = self.concurrent_state_mut().guest_thread;
1691        log::trace!("resume_fiber: save current thread {old_thread:?}");
1692
1693        let fiber = fiber::resolve_or_release(self, fiber).await?;
1694
1695        self.set_thread(old_thread);
1696
1697        let state = self.concurrent_state_mut();
1698
1699        if let Some(ref ot) = old_thread {
1700            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1701        }
1702        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1703
1704        if let Some(mut fiber) = fiber {
1705            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1706            // See the `SuspendReason` documentation for what each case means.
1707            match state.suspend_reason.take().unwrap() {
1708                SuspendReason::NeedWork => {
1709                    if state.worker.is_none() {
1710                        state.worker = Some(fiber);
1711                    } else {
1712                        fiber.dispose(self);
1713                    }
1714                }
1715                SuspendReason::Yielding { thread, .. } => {
1716                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1717                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1718                }
1719                SuspendReason::ExplicitlySuspending { thread, .. } => {
1720                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1721                }
1722                SuspendReason::Waiting { set, thread, .. } => {
1723                    let old = state
1724                        .get_mut(set)?
1725                        .waiting
1726                        .insert(thread, WaitMode::Fiber(fiber));
1727                    assert!(old.is_none());
1728                }
1729            };
1730        } else {
1731            log::trace!("resume_fiber: fiber has exited");
1732        }
1733
1734        Ok(())
1735    }
1736
1737    /// Suspend the current fiber, storing the reason in
1738    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1739    /// it should be resumed.
1740    ///
1741    /// See the `SuspendReason` documentation for details.
1742    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1743        log::trace!("suspend fiber: {reason:?}");
1744
1745        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1746        // pop the call context which manages resource borrows before suspending
1747        // and then push it again once we've resumed.
1748        let task = match &reason {
1749            SuspendReason::Yielding { thread, .. }
1750            | SuspendReason::Waiting { thread, .. }
1751            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1752            SuspendReason::NeedWork => None,
1753        };
1754
1755        let old_guest_thread = if let Some(task) = task {
1756            self.maybe_pop_call_context(task)?;
1757            self.concurrent_state_mut().guest_thread
1758        } else {
1759            None
1760        };
1761
1762        // We should not have reached here unless either there's no current
1763        // task, or the current task is permitted to block.  In addition, we
1764        // special-case `thread.switch-to` and waiting for a subtask to go from
1765        // `starting` to `started`, both of which we consider non-blocking
1766        // operations despite requiring a suspend.
1767        assert!(
1768            matches!(
1769                reason,
1770                SuspendReason::ExplicitlySuspending {
1771                    skip_may_block_check: true,
1772                    ..
1773                } | SuspendReason::Waiting {
1774                    skip_may_block_check: true,
1775                    ..
1776                } | SuspendReason::Yielding {
1777                    skip_may_block_check: true,
1778                    ..
1779                }
1780            ) || old_guest_thread
1781                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1782                .unwrap_or(true)
1783        );
1784
1785        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1786        assert!(suspend_reason.is_none());
1787        *suspend_reason = Some(reason);
1788
1789        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1790
1791        if let Some(task) = task {
1792            self.set_thread(old_guest_thread);
1793            self.maybe_push_call_context(task)?;
1794        }
1795
1796        Ok(())
1797    }
1798
1799    /// Push the call context for managing resource borrows for the specified
1800    /// guest task if it has not yet either returned a result or cancelled
1801    /// itself.
1802    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1803        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1804
1805        if !task.returned_or_cancelled() {
1806            log::trace!("push call context for {guest_task:?}");
1807            let call_context = task.call_context.take().unwrap();
1808            self.component_resource_state().0.push(call_context);
1809        }
1810        Ok(())
1811    }
1812
1813    /// Pop the call context for managing resource borrows for the specified
1814    /// guest task if it has not yet either returned a result or cancelled
1815    /// itself.
1816    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1817        if !self
1818            .concurrent_state_mut()
1819            .get_mut(guest_task)?
1820            .returned_or_cancelled()
1821        {
1822            log::trace!("pop call context for {guest_task:?}");
1823            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1824            self.concurrent_state_mut()
1825                .get_mut(guest_task)?
1826                .call_context = call_context;
1827        }
1828        Ok(())
1829    }
1830
1831    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1832        let state = self.concurrent_state_mut();
1833        let caller = state.guest_thread.unwrap();
1834        let old_set = waitable.common(state)?.set;
1835        let set = state.get_mut(caller.task)?.sync_call_set;
1836        waitable.join(state, Some(set))?;
1837        self.suspend(SuspendReason::Waiting {
1838            set,
1839            thread: caller,
1840            skip_may_block_check: false,
1841        })?;
1842        let state = self.concurrent_state_mut();
1843        waitable.join(state, old_set)
1844    }
1845}
1846
1847impl Instance {
1848    /// Get the next pending event for the specified task and (optional)
1849    /// waitable set, along with the waitable handle if applicable.
1850    fn get_event(
1851        self,
1852        store: &mut StoreOpaque,
1853        guest_task: TableId<GuestTask>,
1854        set: Option<TableId<WaitableSet>>,
1855        cancellable: bool,
1856    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1857        let state = store.concurrent_state_mut();
1858
1859        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1860            log::trace!("deliver event {event:?} to {guest_task:?}");
1861
1862            if cancellable || !matches!(event, Event::Cancelled) {
1863                return Ok(Some((event, None)));
1864            } else {
1865                state.get_mut(guest_task)?.event = Some(event);
1866            }
1867        }
1868
1869        Ok(
1870            if let Some((set, waitable)) = set
1871                .and_then(|set| {
1872                    state
1873                        .get_mut(set)
1874                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1875                        .transpose()
1876                })
1877                .transpose()?
1878            {
1879                let common = waitable.common(state)?;
1880                let handle = common.handle.unwrap();
1881                let event = common.event.take().unwrap();
1882
1883                log::trace!(
1884                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1885                );
1886
1887                waitable.on_delivery(store, self, event);
1888
1889                Some((event, Some((waitable, handle))))
1890            } else {
1891                None
1892            },
1893        )
1894    }
1895
1896    /// Handle the `CallbackCode` returned from an async-lifted export or its
1897    /// callback.
1898    ///
1899    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1900    /// using `handle_guest_call`.
1901    fn handle_callback_code(
1902        self,
1903        store: &mut StoreOpaque,
1904        guest_thread: QualifiedThreadId,
1905        runtime_instance: RuntimeComponentInstanceIndex,
1906        code: u32,
1907    ) -> Result<Option<GuestCall>> {
1908        let (code, set) = unpack_callback_code(code);
1909
1910        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1911
1912        let state = store.concurrent_state_mut();
1913
1914        let get_set = |store: &mut StoreOpaque, handle| {
1915            if handle == 0 {
1916                bail!("invalid waitable-set handle");
1917            }
1918
1919            let set = store
1920                .instance_state(RuntimeInstance {
1921                    instance: self.id().instance(),
1922                    index: runtime_instance,
1923                })
1924                .handle_table()
1925                .waitable_set_rep(handle)?;
1926
1927            Ok(TableId::<WaitableSet>::new(set))
1928        };
1929
1930        Ok(match code {
1931            callback_code::EXIT => {
1932                log::trace!("implicit thread {guest_thread:?} completed");
1933                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1934                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1935                if task.threads.is_empty() && !task.returned_or_cancelled() {
1936                    bail!(Trap::NoAsyncResult);
1937                }
1938                match &task.caller {
1939                    Caller::Host { .. } => {
1940                        if task.ready_to_delete() {
1941                            Waitable::Guest(guest_thread.task)
1942                                .delete_from(store.concurrent_state_mut())?;
1943                        }
1944                    }
1945                    Caller::Guest { .. } => {
1946                        task.exited = true;
1947                        task.callback = None;
1948                    }
1949                }
1950                None
1951            }
1952            callback_code::YIELD => {
1953                let task = state.get_mut(guest_thread.task)?;
1954                // If an `Event::Cancelled` is pending, we'll deliver that;
1955                // otherwise, we'll deliver `Event::None`.  Note that
1956                // `GuestTask::event` is only ever set to one of those two
1957                // `Event` variants.
1958                if let Some(event) = task.event {
1959                    assert!(matches!(event, Event::None | Event::Cancelled));
1960                } else {
1961                    task.event = Some(Event::None);
1962                }
1963                let call = GuestCall {
1964                    thread: guest_thread,
1965                    kind: GuestCallKind::DeliverEvent {
1966                        instance: self,
1967                        set: None,
1968                    },
1969                };
1970                if state.may_block(guest_thread.task) {
1971                    // Push this thread onto the "low priority" queue so it runs
1972                    // after any other threads have had a chance to run.
1973                    state.push_low_priority(WorkItem::GuestCall(call));
1974                    None
1975                } else {
1976                    // Yielding in a non-blocking context is defined as a no-op
1977                    // according to the spec, so we must run this thread
1978                    // immediately without allowing any others to run.
1979                    Some(call)
1980                }
1981            }
1982            callback_code::WAIT => {
1983                // The task may only return `WAIT` if it was created for a call
1984                // to an async export).  Otherwise, we'll trap.
1985                state.check_blocking_for(guest_thread.task)?;
1986
1987                let set = get_set(store, set)?;
1988                let state = store.concurrent_state_mut();
1989
1990                if state.get_mut(guest_thread.task)?.event.is_some()
1991                    || !state.get_mut(set)?.ready.is_empty()
1992                {
1993                    // An event is immediately available; deliver it ASAP.
1994                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1995                        thread: guest_thread,
1996                        kind: GuestCallKind::DeliverEvent {
1997                            instance: self,
1998                            set: Some(set),
1999                        },
2000                    }));
2001                } else {
2002                    // No event is immediately available.
2003                    //
2004                    // We're waiting, so register to be woken up when an event
2005                    // is published for this waitable set.
2006                    //
2007                    // Here we also set `GuestTask::wake_on_cancel` which allows
2008                    // `subtask.cancel` to interrupt the wait.
2009                    let old = state
2010                        .get_mut(guest_thread.thread)?
2011                        .wake_on_cancel
2012                        .replace(set);
2013                    assert!(old.is_none());
2014                    let old = state
2015                        .get_mut(set)?
2016                        .waiting
2017                        .insert(guest_thread, WaitMode::Callback(self));
2018                    assert!(old.is_none());
2019                }
2020                None
2021            }
2022            _ => bail!("unsupported callback code: {code}"),
2023        })
2024    }
2025
2026    fn cleanup_thread(
2027        self,
2028        store: &mut StoreOpaque,
2029        guest_thread: QualifiedThreadId,
2030        runtime_instance: RuntimeComponentInstanceIndex,
2031    ) -> Result<()> {
2032        let guest_id = store
2033            .concurrent_state_mut()
2034            .get_mut(guest_thread.thread)?
2035            .instance_rep;
2036        store
2037            .instance_state(RuntimeInstance {
2038                instance: self.id().instance(),
2039                index: runtime_instance,
2040            })
2041            .thread_handle_table()
2042            .guest_thread_remove(guest_id.unwrap())?;
2043
2044        store.concurrent_state_mut().delete(guest_thread.thread)?;
2045        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2046        task.threads.remove(&guest_thread.thread);
2047        Ok(())
2048    }
2049
2050    /// Add the specified guest call to the "high priority" work item queue, to
2051    /// be started as soon as backpressure and/or reentrance rules allow.
2052    ///
2053    /// SAFETY: The raw pointer arguments must be valid references to guest
2054    /// functions (with the appropriate signatures) when the closures queued by
2055    /// this function are called.
2056    unsafe fn queue_call<T: 'static>(
2057        self,
2058        mut store: StoreContextMut<T>,
2059        guest_thread: QualifiedThreadId,
2060        callee: SendSyncPtr<VMFuncRef>,
2061        param_count: usize,
2062        result_count: usize,
2063        async_: bool,
2064        callback: Option<SendSyncPtr<VMFuncRef>>,
2065        post_return: Option<SendSyncPtr<VMFuncRef>>,
2066    ) -> Result<()> {
2067        /// Return a closure which will call the specified function in the scope
2068        /// of the specified task.
2069        ///
2070        /// This will use `GuestTask::lower_params` to lower the parameters, but
2071        /// will not lift the result; instead, it returns a
2072        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2073        /// any, may be lifted.  Note that an async-lifted export will have
2074        /// returned its result using the `task.return` intrinsic (or not
2075        /// returned a result at all, in the case of `task.cancel`), in which
2076        /// case the "result" of this call will either be a callback code or
2077        /// nothing.
2078        ///
2079        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2080        /// the returned closure is called.
2081        unsafe fn make_call<T: 'static>(
2082            store: StoreContextMut<T>,
2083            guest_thread: QualifiedThreadId,
2084            callee: SendSyncPtr<VMFuncRef>,
2085            param_count: usize,
2086            result_count: usize,
2087        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2088        + Send
2089        + Sync
2090        + 'static
2091        + use<T> {
2092            let token = StoreToken::new(store);
2093            move |store: &mut dyn VMStore| {
2094                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2095
2096                store
2097                    .concurrent_state_mut()
2098                    .get_mut(guest_thread.thread)?
2099                    .state = GuestThreadState::Running;
2100                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2101                let lower = task.lower_params.take().unwrap();
2102
2103                lower(store, &mut storage[..param_count])?;
2104
2105                let mut store = token.as_context_mut(store);
2106
2107                // SAFETY: Per the contract documented in `make_call's`
2108                // documentation, `callee` must be a valid pointer.
2109                unsafe {
2110                    crate::Func::call_unchecked_raw(
2111                        &mut store,
2112                        callee.as_non_null(),
2113                        NonNull::new(
2114                            &mut storage[..param_count.max(result_count)]
2115                                as *mut [MaybeUninit<ValRaw>] as _,
2116                        )
2117                        .unwrap(),
2118                    )?;
2119                }
2120
2121                Ok(storage)
2122            }
2123        }
2124
2125        // SAFETY: Per the contract described in this function documentation,
2126        // the `callee` pointer which `call` closes over must be valid when
2127        // called by the closure we queue below.
2128        let call = unsafe {
2129            make_call(
2130                store.as_context_mut(),
2131                guest_thread,
2132                callee,
2133                param_count,
2134                result_count,
2135            )
2136        };
2137
2138        let callee_instance = store
2139            .0
2140            .concurrent_state_mut()
2141            .get_mut(guest_thread.task)?
2142            .instance;
2143
2144        let fun = if callback.is_some() {
2145            assert!(async_);
2146
2147            Box::new(move |store: &mut dyn VMStore| {
2148                self.add_guest_thread_to_instance_table(
2149                    guest_thread.thread,
2150                    store,
2151                    callee_instance.index,
2152                )?;
2153                let old_thread = store.set_thread(Some(guest_thread));
2154                log::trace!(
2155                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2156                );
2157
2158                store.maybe_push_call_context(guest_thread.task)?;
2159
2160                store.enter_instance(callee_instance);
2161
2162                // SAFETY: See the documentation for `make_call` to review the
2163                // contract we must uphold for `call` here.
2164                //
2165                // Per the contract described in the `queue_call`
2166                // documentation, the `callee` pointer which `call` closes
2167                // over must be valid.
2168                let storage = call(store)?;
2169
2170                store.exit_instance(callee_instance)?;
2171
2172                store.maybe_pop_call_context(guest_thread.task)?;
2173
2174                store.set_thread(old_thread);
2175                let state = store.concurrent_state_mut();
2176                old_thread
2177                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2178                log::trace!("stackless call: restored {old_thread:?} as current thread");
2179
2180                // SAFETY: `wasmparser` will have validated that the callback
2181                // function returns a `i32` result.
2182                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2183
2184                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2185            })
2186                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2187        } else {
2188            let token = StoreToken::new(store.as_context_mut());
2189            Box::new(move |store: &mut dyn VMStore| {
2190                self.add_guest_thread_to_instance_table(
2191                    guest_thread.thread,
2192                    store,
2193                    callee_instance.index,
2194                )?;
2195                let old_thread = store.set_thread(Some(guest_thread));
2196                log::trace!(
2197                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2198                );
2199                let flags = self.id().get(store).instance_flags(callee_instance.index);
2200
2201                store.maybe_push_call_context(guest_thread.task)?;
2202
2203                // Unless this is a callback-less (i.e. stackful)
2204                // async-lifted export, we need to record that the instance
2205                // cannot be entered until the call returns.
2206                if !async_ {
2207                    store.enter_instance(callee_instance);
2208                }
2209
2210                // SAFETY: See the documentation for `make_call` to review the
2211                // contract we must uphold for `call` here.
2212                //
2213                // Per the contract described in the `queue_call`
2214                // documentation, the `callee` pointer which `call` closes
2215                // over must be valid.
2216                let storage = call(store)?;
2217
2218                if async_ {
2219                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2220                    if task.threads.len() == 1 && !task.returned_or_cancelled() {
2221                        bail!(Trap::NoAsyncResult);
2222                    }
2223                } else {
2224                    // This is a sync-lifted export, so now is when we lift the
2225                    // result, optionally call the post-return function, if any,
2226                    // and finally notify any current or future waiters that the
2227                    // subtask has returned.
2228
2229                    let lift = {
2230                        store.exit_instance(callee_instance)?;
2231
2232                        let state = store.concurrent_state_mut();
2233                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2234
2235                        state
2236                            .get_mut(guest_thread.task)?
2237                            .lift_result
2238                            .take()
2239                            .unwrap()
2240                    };
2241
2242                    // SAFETY: `result_count` represents the number of core Wasm
2243                    // results returned, per `wasmparser`.
2244                    let result = (lift.lift)(store, unsafe {
2245                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2246                            &storage[..result_count],
2247                        )
2248                    })?;
2249
2250                    let post_return_arg = match result_count {
2251                        0 => ValRaw::i32(0),
2252                        // SAFETY: `result_count` represents the number of
2253                        // core Wasm results returned, per `wasmparser`.
2254                        1 => unsafe { storage[0].assume_init() },
2255                        _ => unreachable!(),
2256                    };
2257
2258                    unsafe {
2259                        call_post_return(
2260                            token.as_context_mut(store),
2261                            post_return.map(|v| v.as_non_null()),
2262                            post_return_arg,
2263                            flags,
2264                        )?;
2265                    }
2266
2267                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2268                }
2269
2270                // This is a callback-less call, so the implicit thread has now completed
2271                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2272
2273                store.set_thread(old_thread);
2274
2275                store.maybe_pop_call_context(guest_thread.task)?;
2276
2277                let state = store.concurrent_state_mut();
2278                let task = state.get_mut(guest_thread.task)?;
2279
2280                match &task.caller {
2281                    Caller::Host { .. } => {
2282                        if task.ready_to_delete() {
2283                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2284                        }
2285                    }
2286                    Caller::Guest { .. } => {
2287                        task.exited = true;
2288                    }
2289                }
2290
2291                Ok(None)
2292            })
2293        };
2294
2295        store
2296            .0
2297            .concurrent_state_mut()
2298            .push_high_priority(WorkItem::GuestCall(GuestCall {
2299                thread: guest_thread,
2300                kind: GuestCallKind::StartImplicit(fun),
2301            }));
2302
2303        Ok(())
2304    }
2305
2306    /// Prepare (but do not start) a guest->guest call.
2307    ///
2308    /// This is called from fused adapter code generated in
2309    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2310    /// are synthesized Wasm functions which move the parameters from the caller
2311    /// to the callee and the result from the callee to the caller,
2312    /// respectively.  The adapter will call `Self::start_call` immediately
2313    /// after calling this function.
2314    ///
2315    /// SAFETY: All the pointer arguments must be valid pointers to guest
2316    /// entities (and with the expected signatures for the function references
2317    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2318    unsafe fn prepare_call<T: 'static>(
2319        self,
2320        mut store: StoreContextMut<T>,
2321        start: *mut VMFuncRef,
2322        return_: *mut VMFuncRef,
2323        caller_instance: RuntimeComponentInstanceIndex,
2324        callee_instance: RuntimeComponentInstanceIndex,
2325        task_return_type: TypeTupleIndex,
2326        callee_async: bool,
2327        memory: *mut VMMemoryDefinition,
2328        string_encoding: u8,
2329        caller_info: CallerInfo,
2330    ) -> Result<()> {
2331        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2332            // A task may only call an async-typed function via a sync lower if
2333            // it was created by a call to an async export.  Otherwise, we'll
2334            // trap.
2335            store.0.check_blocking()?;
2336        }
2337
2338        enum ResultInfo {
2339            Heap { results: u32 },
2340            Stack { result_count: u32 },
2341        }
2342
2343        let result_info = match &caller_info {
2344            CallerInfo::Async {
2345                has_result: true,
2346                params,
2347            } => ResultInfo::Heap {
2348                results: params.last().unwrap().get_u32(),
2349            },
2350            CallerInfo::Async {
2351                has_result: false, ..
2352            } => ResultInfo::Stack { result_count: 0 },
2353            CallerInfo::Sync {
2354                result_count,
2355                params,
2356            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2357                results: params.last().unwrap().get_u32(),
2358            },
2359            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2360                result_count: *result_count,
2361            },
2362        };
2363
2364        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2365
2366        // Create a new guest task for the call, closing over the `start` and
2367        // `return_` functions to lift the parameters and lower the result,
2368        // respectively.
2369        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2370        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2371        let token = StoreToken::new(store.as_context_mut());
2372        let state = store.0.concurrent_state_mut();
2373        let old_thread = state.guest_thread.unwrap();
2374
2375        assert_eq!(
2376            state.get_mut(old_thread.task)?.instance,
2377            RuntimeInstance {
2378                instance: self.id().instance(),
2379                index: caller_instance,
2380            }
2381        );
2382
2383        let new_task = GuestTask::new(
2384            state,
2385            Box::new(move |store, dst| {
2386                let mut store = token.as_context_mut(store);
2387                assert!(dst.len() <= MAX_FLAT_PARAMS);
2388                // The `+ 1` here accounts for the return pointer, if any:
2389                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2390                let count = match caller_info {
2391                    // Async callers, if they have a result, use the last
2392                    // parameter as a return pointer so chop that off if
2393                    // relevant here.
2394                    CallerInfo::Async { params, has_result } => {
2395                        let params = &params[..params.len() - usize::from(has_result)];
2396                        for (param, src) in params.iter().zip(&mut src) {
2397                            src.write(*param);
2398                        }
2399                        params.len()
2400                    }
2401
2402                    // Sync callers forward everything directly.
2403                    CallerInfo::Sync { params, .. } => {
2404                        for (param, src) in params.iter().zip(&mut src) {
2405                            src.write(*param);
2406                        }
2407                        params.len()
2408                    }
2409                };
2410                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2411                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2412                // how it was constructed (see
2413                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2414                // for details) we know it takes count parameters and returns
2415                // `dst.len()` results.
2416                unsafe {
2417                    crate::Func::call_unchecked_raw(
2418                        &mut store,
2419                        start.as_non_null(),
2420                        NonNull::new(
2421                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2422                        )
2423                        .unwrap(),
2424                    )?;
2425                }
2426                dst.copy_from_slice(&src[..dst.len()]);
2427                let state = store.0.concurrent_state_mut();
2428                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2429                    state,
2430                    Some(Event::Subtask {
2431                        status: Status::Started,
2432                    }),
2433                )?;
2434                Ok(())
2435            }),
2436            LiftResult {
2437                lift: Box::new(move |store, src| {
2438                    // SAFETY: See comment in closure passed as `lower_params`
2439                    // parameter above.
2440                    let mut store = token.as_context_mut(store);
2441                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2442                    if let ResultInfo::Heap { results } = &result_info {
2443                        my_src.push(ValRaw::u32(*results));
2444                    }
2445                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2446                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2447                    // on how it was constructed (see
2448                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2449                    // for details) we know it takes `src.len()` parameters and
2450                    // returns up to 1 result.
2451                    unsafe {
2452                        crate::Func::call_unchecked_raw(
2453                            &mut store,
2454                            return_.as_non_null(),
2455                            my_src.as_mut_slice().into(),
2456                        )?;
2457                    }
2458                    let state = store.0.concurrent_state_mut();
2459                    let thread = state.guest_thread.unwrap();
2460                    if sync_caller {
2461                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2462                            if let ResultInfo::Stack { result_count } = &result_info {
2463                                match result_count {
2464                                    0 => None,
2465                                    1 => Some(my_src[0]),
2466                                    _ => unreachable!(),
2467                                }
2468                            } else {
2469                                None
2470                            },
2471                        );
2472                    }
2473                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2474                }),
2475                ty: task_return_type,
2476                memory: NonNull::new(memory).map(SendSyncPtr::new),
2477                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2478            },
2479            Caller::Guest { thread: old_thread },
2480            None,
2481            RuntimeInstance {
2482                instance: self.id().instance(),
2483                index: callee_instance,
2484            },
2485            callee_async,
2486        )?;
2487
2488        let guest_task = state.push(new_task)?;
2489        let new_thread = GuestThread::new_implicit(guest_task);
2490        let guest_thread = state.push(new_thread)?;
2491        state.get_mut(guest_task)?.threads.insert(guest_thread);
2492
2493        store
2494            .0
2495            .concurrent_state_mut()
2496            .get_mut(old_thread.task)?
2497            .subtasks
2498            .insert(guest_task);
2499
2500        // Make the new thread the current one so that `Self::start_call` knows
2501        // which one to start.
2502        store.0.set_thread(Some(QualifiedThreadId {
2503            task: guest_task,
2504            thread: guest_thread,
2505        }));
2506        log::trace!(
2507            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2508        );
2509
2510        Ok(())
2511    }
2512
2513    /// Call the specified callback function for an async-lifted export.
2514    ///
2515    /// SAFETY: `function` must be a valid reference to a guest function of the
2516    /// correct signature for a callback.
2517    unsafe fn call_callback<T>(
2518        self,
2519        mut store: StoreContextMut<T>,
2520        function: SendSyncPtr<VMFuncRef>,
2521        event: Event,
2522        handle: u32,
2523    ) -> Result<u32> {
2524        let (ordinal, result) = event.parts();
2525        let params = &mut [
2526            ValRaw::u32(ordinal),
2527            ValRaw::u32(handle),
2528            ValRaw::u32(result),
2529        ];
2530        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2531        // `wasmtime-cranelift`-generated fused adapter code or
2532        // `component::Options`.  Per `wasmparser` callback signature
2533        // validation, we know it takes three parameters and returns one.
2534        unsafe {
2535            crate::Func::call_unchecked_raw(
2536                &mut store,
2537                function.as_non_null(),
2538                params.as_mut_slice().into(),
2539            )?;
2540        }
2541        Ok(params[0].get_u32())
2542    }
2543
2544    /// Start a guest->guest call previously prepared using
2545    /// `Self::prepare_call`.
2546    ///
2547    /// This is called from fused adapter code generated in
2548    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2549    /// this function immediately after calling `Self::prepare_call`.
2550    ///
2551    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2552    /// functions with the appropriate signatures for the current guest task.
2553    /// If this is a call to an async-lowered import, the actual call may be
2554    /// deferred and run after this function returns, in which case the pointer
2555    /// arguments must also be valid when the call happens.
2556    unsafe fn start_call<T: 'static>(
2557        self,
2558        mut store: StoreContextMut<T>,
2559        callback: *mut VMFuncRef,
2560        post_return: *mut VMFuncRef,
2561        callee: *mut VMFuncRef,
2562        param_count: u32,
2563        result_count: u32,
2564        flags: u32,
2565        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2566    ) -> Result<u32> {
2567        let token = StoreToken::new(store.as_context_mut());
2568        let async_caller = storage.is_none();
2569        let state = store.0.concurrent_state_mut();
2570        let guest_thread = state.guest_thread.unwrap();
2571        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2572        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2573        let param_count = usize::try_from(param_count).unwrap();
2574        assert!(param_count <= MAX_FLAT_PARAMS);
2575        let result_count = usize::try_from(result_count).unwrap();
2576        assert!(result_count <= MAX_FLAT_RESULTS);
2577
2578        let task = state.get_mut(guest_thread.task)?;
2579        if !callback.is_null() {
2580            // We're calling an async-lifted export with a callback, so store
2581            // the callback and related context as part of the task so we can
2582            // call it later when needed.
2583            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2584            task.callback = Some(Box::new(move |store, event, handle| {
2585                let store = token.as_context_mut(store);
2586                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2587            }));
2588        }
2589
2590        let Caller::Guest { thread: caller } = &task.caller else {
2591            // As of this writing, `start_call` is only used for guest->guest
2592            // calls.
2593            unreachable!()
2594        };
2595        let caller = *caller;
2596        let caller_instance = state.get_mut(caller.task)?.instance;
2597
2598        // Queue the call as a "high priority" work item.
2599        unsafe {
2600            self.queue_call(
2601                store.as_context_mut(),
2602                guest_thread,
2603                callee,
2604                param_count,
2605                result_count,
2606                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2607                NonNull::new(callback).map(SendSyncPtr::new),
2608                NonNull::new(post_return).map(SendSyncPtr::new),
2609            )?;
2610        }
2611
2612        let state = store.0.concurrent_state_mut();
2613
2614        // Use the caller's `GuestTask::sync_call_set` to register interest in
2615        // the subtask...
2616        let guest_waitable = Waitable::Guest(guest_thread.task);
2617        let old_set = guest_waitable.common(state)?.set;
2618        let set = state.get_mut(caller.task)?.sync_call_set;
2619        guest_waitable.join(state, Some(set))?;
2620
2621        // ... and suspend this fiber temporarily while we wait for it to start.
2622        //
2623        // Note that we _could_ call the callee directly using the current fiber
2624        // rather than suspend this one, but that would make reasoning about the
2625        // event loop more complicated and is probably only worth doing if
2626        // there's a measurable performance benefit.  In addition, it would mean
2627        // blocking the caller if the callee calls a blocking sync-lowered
2628        // import, and as of this writing the spec says we must not do that.
2629        //
2630        // Alternatively, the fused adapter code could be modified to call the
2631        // callee directly without calling a host-provided intrinsic at all (in
2632        // which case it would need to do its own, inline backpressure checks,
2633        // etc.).  Again, we'd want to see a measurable performance benefit
2634        // before committing to such an optimization.  And again, we'd need to
2635        // update the spec to allow that.
2636        let (status, waitable) = loop {
2637            store.0.suspend(SuspendReason::Waiting {
2638                set,
2639                thread: caller,
2640                // Normally, `StoreOpaque::suspend` would assert it's being
2641                // called from a context where blocking is allowed.  However, if
2642                // `async_caller` is `true`, we'll only "block" long enough for
2643                // the callee to start, i.e. we won't repeat this loop, so we
2644                // tell `suspend` it's okay even if we're not allowed to block.
2645                // Alternatively, if the callee is not an async function, then
2646                // we know it won't block anyway.
2647                skip_may_block_check: async_caller || !callee_async,
2648            })?;
2649
2650            let state = store.0.concurrent_state_mut();
2651
2652            log::trace!("taking event for {:?}", guest_thread.task);
2653            let event = guest_waitable.take_event(state)?;
2654            let Some(Event::Subtask { status }) = event else {
2655                unreachable!();
2656            };
2657
2658            log::trace!("status {status:?} for {:?}", guest_thread.task);
2659
2660            if status == Status::Returned {
2661                // It returned, so we can stop waiting.
2662                break (status, None);
2663            } else if async_caller {
2664                // It hasn't returned yet, but the caller is calling via an
2665                // async-lowered import, so we generate a handle for the task
2666                // waitable and return the status.
2667                let handle = store
2668                    .0
2669                    .instance_state(caller_instance)
2670                    .handle_table()
2671                    .subtask_insert_guest(guest_thread.task.rep())?;
2672                store
2673                    .0
2674                    .concurrent_state_mut()
2675                    .get_mut(guest_thread.task)?
2676                    .common
2677                    .handle = Some(handle);
2678                break (status, Some(handle));
2679            } else {
2680                // The callee hasn't returned yet, and the caller is calling via
2681                // a sync-lowered import, so we loop and keep waiting until the
2682                // callee returns.
2683            }
2684        };
2685
2686        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2687
2688        // Reset the current thread to point to the caller as it resumes control.
2689        store.0.set_thread(Some(caller));
2690        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2691        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2692
2693        if let Some(storage) = storage {
2694            // The caller used a sync-lowered import to call an async-lifted
2695            // export, in which case the result, if any, has been stashed in
2696            // `GuestTask::sync_result`.
2697            let state = store.0.concurrent_state_mut();
2698            let task = state.get_mut(guest_thread.task)?;
2699            if let Some(result) = task.sync_result.take() {
2700                if let Some(result) = result {
2701                    storage[0] = MaybeUninit::new(result);
2702                }
2703
2704                if task.exited && task.ready_to_delete() {
2705                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2706                }
2707            }
2708        }
2709
2710        Ok(status.pack(waitable))
2711    }
2712
2713    /// Poll the specified future once on behalf of a guest->host call using an
2714    /// async-lowered import.
2715    ///
2716    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2717    /// `Pending`, add it to the set of futures to be polled as part of this
2718    /// instance's event loop until it completes, and then return
2719    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2720    ///
2721    /// Whether the future returns `Ready` immediately or later, the `lower`
2722    /// function will be used to lower the result, if any, into the guest caller's
2723    /// stack and linear memory unless the task has been cancelled.
2724    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2725        self,
2726        mut store: StoreContextMut<'_, T>,
2727        future: impl Future<Output = Result<R>> + Send + 'static,
2728        caller_instance: RuntimeComponentInstanceIndex,
2729        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2730    ) -> Result<Option<u32>> {
2731        let token = StoreToken::new(store.as_context_mut());
2732        let state = store.0.concurrent_state_mut();
2733        let caller = state.guest_thread.unwrap();
2734
2735        // Create an abortable future which hooks calls to poll and manages call
2736        // context state for the future.
2737        let (join_handle, future) = JoinHandle::run(async move {
2738            let mut future = pin!(future);
2739            let mut call_context = None;
2740            future::poll_fn(move |cx| {
2741                // Push the call context for managing any resource borrows
2742                // for the task.
2743                tls::get(|store| {
2744                    if let Some(call_context) = call_context.take() {
2745                        token
2746                            .as_context_mut(store)
2747                            .0
2748                            .component_resource_state()
2749                            .0
2750                            .push(call_context);
2751                    }
2752                });
2753
2754                let result = future.as_mut().poll(cx);
2755
2756                if result.is_pending() {
2757                    // Pop the call context for managing any resource
2758                    // borrows for the task.
2759                    tls::get(|store| {
2760                        call_context = Some(
2761                            token
2762                                .as_context_mut(store)
2763                                .0
2764                                .component_resource_state()
2765                                .0
2766                                .pop()
2767                                .unwrap(),
2768                        );
2769                    });
2770                }
2771                result
2772            })
2773            .await
2774        });
2775
2776        // We create a new host task even though it might complete immediately
2777        // (in which case we won't need to pass a waitable back to the guest).
2778        // If it does complete immediately, we'll remove it before we return.
2779        let task = state.push(HostTask::new(
2780            RuntimeInstance {
2781                instance: self.id().instance(),
2782                index: caller_instance,
2783            },
2784            Some(join_handle),
2785        ))?;
2786
2787        log::trace!("new host task child of {caller:?}: {task:?}");
2788
2789        let mut future = Box::pin(future);
2790
2791        // Finally, poll the future.  We can use a dummy `Waker` here because
2792        // we'll add the future to `ConcurrentState::futures` and poll it
2793        // automatically from the event loop if it doesn't complete immediately
2794        // here.
2795        let poll = tls::set(store.0, || {
2796            future
2797                .as_mut()
2798                .poll(&mut Context::from_waker(&Waker::noop()))
2799        });
2800
2801        Ok(match poll {
2802            Poll::Ready(None) => unreachable!(),
2803            Poll::Ready(Some(result)) => {
2804                // It finished immediately; lower the result and delete the
2805                // task.
2806                lower(store.as_context_mut(), result?)?;
2807                log::trace!("delete host task {task:?} (already ready)");
2808                store.0.concurrent_state_mut().delete(task)?;
2809                None
2810            }
2811            Poll::Pending => {
2812                // It hasn't finished yet; add the future to
2813                // `ConcurrentState::futures` so it will be polled by the event
2814                // loop and allocate a waitable handle to return to the guest.
2815
2816                // Wrap the future in a closure responsible for lowering the result into
2817                // the guest's stack and memory, as well as notifying any waiters that
2818                // the task returned.
2819                let future =
2820                    Box::pin(async move {
2821                        let result = match future.await {
2822                            Some(result) => result?,
2823                            // Task was cancelled; nothing left to do.
2824                            None => return Ok(()),
2825                        };
2826                        tls::get(move |store| {
2827                            // Here we schedule a task to run on a worker fiber to do
2828                            // the lowering since it may involve a call to the guest's
2829                            // realloc function.  This is necessary because calling the
2830                            // guest while there are host embedder frames on the stack
2831                            // is unsound.
2832                            store.concurrent_state_mut().push_high_priority(
2833                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2834                                    lower(token.as_context_mut(store), result)?;
2835                                    let state = store.concurrent_state_mut();
2836                                    state.get_mut(task)?.join_handle.take();
2837                                    Waitable::Host(task).set_event(
2838                                        state,
2839                                        Some(Event::Subtask {
2840                                            status: Status::Returned,
2841                                        }),
2842                                    )
2843                                }))),
2844                            );
2845                            Ok(())
2846                        })
2847                    });
2848
2849                store.0.concurrent_state_mut().push_future(future);
2850                let handle = store
2851                    .0
2852                    .instance_state(RuntimeInstance {
2853                        instance: self.id().instance(),
2854                        index: caller_instance,
2855                    })
2856                    .handle_table()
2857                    .subtask_insert_host(task.rep())?;
2858                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2859                log::trace!(
2860                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2861                );
2862                Some(handle)
2863            }
2864        })
2865    }
2866
2867    /// Implements the `task.return` intrinsic, lifting the result for the
2868    /// current guest task.
2869    pub(crate) fn task_return(
2870        self,
2871        store: &mut dyn VMStore,
2872        ty: TypeTupleIndex,
2873        options: OptionsIndex,
2874        storage: &[ValRaw],
2875    ) -> Result<()> {
2876        let state = store.concurrent_state_mut();
2877        let guest_thread = state.guest_thread.unwrap();
2878        let lift = state
2879            .get_mut(guest_thread.task)?
2880            .lift_result
2881            .take()
2882            .ok_or_else(|| {
2883                format_err!("`task.return` or `task.cancel` called more than once for current task")
2884            })?;
2885        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2886
2887        let CanonicalOptions {
2888            string_encoding,
2889            data_model,
2890            ..
2891        } = &self.id().get(store).component().env_component().options[options];
2892
2893        let invalid = ty != lift.ty
2894            || string_encoding != &lift.string_encoding
2895            || match data_model {
2896                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2897                    Some(memory) => {
2898                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2899                        let actual = self.id().get(store).runtime_memory(memory);
2900                        expected != actual.as_ptr()
2901                    }
2902                    // Memory not specified, meaning it didn't need to be
2903                    // specified per validation, so not invalid.
2904                    None => false,
2905                },
2906                // Always invalid as this isn't supported.
2907                CanonicalOptionsDataModel::Gc { .. } => true,
2908            };
2909
2910        if invalid {
2911            bail!("invalid `task.return` signature and/or options for current task");
2912        }
2913
2914        log::trace!("task.return for {guest_thread:?}");
2915
2916        let result = (lift.lift)(store, storage)?;
2917        self.task_complete(store, guest_thread.task, result, Status::Returned)
2918    }
2919
2920    /// Implements the `task.cancel` intrinsic.
2921    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2922        let state = store.concurrent_state_mut();
2923        let guest_thread = state.guest_thread.unwrap();
2924        let task = state.get_mut(guest_thread.task)?;
2925        if !task.cancel_sent {
2926            bail!("`task.cancel` called by task which has not been cancelled")
2927        }
2928        _ = task.lift_result.take().ok_or_else(|| {
2929            format_err!("`task.return` or `task.cancel` called more than once for current task")
2930        })?;
2931
2932        assert!(task.result.is_none());
2933
2934        log::trace!("task.cancel for {guest_thread:?}");
2935
2936        self.task_complete(
2937            store,
2938            guest_thread.task,
2939            Box::new(DummyResult),
2940            Status::ReturnCancelled,
2941        )
2942    }
2943
2944    /// Complete the specified guest task (i.e. indicate that it has either
2945    /// returned a (possibly empty) result or cancelled itself).
2946    ///
2947    /// This will return any resource borrows and notify any current or future
2948    /// waiters that the task has completed.
2949    fn task_complete(
2950        self,
2951        store: &mut StoreOpaque,
2952        guest_task: TableId<GuestTask>,
2953        result: Box<dyn Any + Send + Sync>,
2954        status: Status,
2955    ) -> Result<()> {
2956        let (calls, host_table, _, instance) = store.component_resource_state_with_instance(self);
2957        ResourceTables {
2958            calls,
2959            host_table: Some(host_table),
2960            guest: Some(instance.instance_states()),
2961        }
2962        .exit_call()?;
2963
2964        let state = store.concurrent_state_mut();
2965        let task = state.get_mut(guest_task)?;
2966
2967        if let Caller::Host { tx, .. } = &mut task.caller {
2968            if let Some(tx) = tx.take() {
2969                _ = tx.send(result);
2970            }
2971        } else {
2972            task.result = Some(result);
2973            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2974        }
2975
2976        Ok(())
2977    }
2978
2979    /// Implements the `waitable-set.new` intrinsic.
2980    pub(crate) fn waitable_set_new(
2981        self,
2982        store: &mut StoreOpaque,
2983        caller_instance: RuntimeComponentInstanceIndex,
2984    ) -> Result<u32> {
2985        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2986        let handle = store
2987            .instance_state(RuntimeInstance {
2988                instance: self.id().instance(),
2989                index: caller_instance,
2990            })
2991            .handle_table()
2992            .waitable_set_insert(set.rep())?;
2993        log::trace!("new waitable set {set:?} (handle {handle})");
2994        Ok(handle)
2995    }
2996
2997    /// Implements the `waitable-set.drop` intrinsic.
2998    pub(crate) fn waitable_set_drop(
2999        self,
3000        store: &mut StoreOpaque,
3001        caller_instance: RuntimeComponentInstanceIndex,
3002        set: u32,
3003    ) -> Result<()> {
3004        let rep = store
3005            .instance_state(RuntimeInstance {
3006                instance: self.id().instance(),
3007                index: caller_instance,
3008            })
3009            .handle_table()
3010            .waitable_set_remove(set)?;
3011
3012        log::trace!("drop waitable set {rep} (handle {set})");
3013
3014        let set = store
3015            .concurrent_state_mut()
3016            .delete(TableId::<WaitableSet>::new(rep))?;
3017
3018        if !set.waiting.is_empty() {
3019            bail!("cannot drop waitable set with waiters");
3020        }
3021
3022        Ok(())
3023    }
3024
3025    /// Implements the `waitable.join` intrinsic.
3026    pub(crate) fn waitable_join(
3027        self,
3028        store: &mut StoreOpaque,
3029        caller_instance: RuntimeComponentInstanceIndex,
3030        waitable_handle: u32,
3031        set_handle: u32,
3032    ) -> Result<()> {
3033        let mut instance = self.id().get_mut(store);
3034        let waitable =
3035            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3036
3037        let set = if set_handle == 0 {
3038            None
3039        } else {
3040            let set = instance.instance_states().0[caller_instance]
3041                .handle_table()
3042                .waitable_set_rep(set_handle)?;
3043
3044            Some(TableId::<WaitableSet>::new(set))
3045        };
3046
3047        log::trace!(
3048            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3049        );
3050
3051        waitable.join(store.concurrent_state_mut(), set)
3052    }
3053
3054    /// Implements the `subtask.drop` intrinsic.
3055    pub(crate) fn subtask_drop(
3056        self,
3057        store: &mut StoreOpaque,
3058        caller_instance: RuntimeComponentInstanceIndex,
3059        task_id: u32,
3060    ) -> Result<()> {
3061        self.waitable_join(store, caller_instance, task_id, 0)?;
3062
3063        let (rep, is_host) = store
3064            .instance_state(RuntimeInstance {
3065                instance: self.id().instance(),
3066                index: caller_instance,
3067            })
3068            .handle_table()
3069            .subtask_remove(task_id)?;
3070
3071        let concurrent_state = store.concurrent_state_mut();
3072        let (waitable, expected_caller_instance, delete) = if is_host {
3073            let id = TableId::<HostTask>::new(rep);
3074            let task = concurrent_state.get_mut(id)?;
3075            if task.join_handle.is_some() {
3076                bail!("cannot drop a subtask which has not yet resolved");
3077            }
3078            (Waitable::Host(id), task.caller_instance, true)
3079        } else {
3080            let id = TableId::<GuestTask>::new(rep);
3081            let task = concurrent_state.get_mut(id)?;
3082            if task.lift_result.is_some() {
3083                bail!("cannot drop a subtask which has not yet resolved");
3084            }
3085            if let &Caller::Guest { thread } = &task.caller {
3086                (
3087                    Waitable::Guest(id),
3088                    concurrent_state.get_mut(thread.task)?.instance,
3089                    concurrent_state.get_mut(id)?.exited,
3090                )
3091            } else {
3092                unreachable!()
3093            }
3094        };
3095
3096        waitable.common(concurrent_state)?.handle = None;
3097
3098        if waitable.take_event(concurrent_state)?.is_some() {
3099            bail!("cannot drop a subtask with an undelivered event");
3100        }
3101
3102        if delete {
3103            waitable.delete_from(concurrent_state)?;
3104        }
3105
3106        // Since waitables can neither be passed between instances nor forged,
3107        // this should never fail unless there's a bug in Wasmtime, but we check
3108        // here to be sure:
3109        assert_eq!(
3110            expected_caller_instance,
3111            RuntimeInstance {
3112                instance: self.id().instance(),
3113                index: caller_instance
3114            }
3115        );
3116        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3117        Ok(())
3118    }
3119
3120    /// Implements the `waitable-set.wait` intrinsic.
3121    pub(crate) fn waitable_set_wait(
3122        self,
3123        store: &mut StoreOpaque,
3124        options: OptionsIndex,
3125        set: u32,
3126        payload: u32,
3127    ) -> Result<u32> {
3128        if !self.options(store, options).async_ {
3129            // The caller may only call `waitable-set.wait` from an async task
3130            // (i.e. a task created via a call to an async export).
3131            // Otherwise, we'll trap.
3132            store.check_blocking()?;
3133        }
3134
3135        let &CanonicalOptions {
3136            cancellable,
3137            instance: caller_instance,
3138            ..
3139        } = &self.id().get(store).component().env_component().options[options];
3140        let rep = store
3141            .instance_state(RuntimeInstance {
3142                instance: self.id().instance(),
3143                index: caller_instance,
3144            })
3145            .handle_table()
3146            .waitable_set_rep(set)?;
3147
3148        self.waitable_check(
3149            store,
3150            cancellable,
3151            WaitableCheck::Wait,
3152            WaitableCheckParams {
3153                set: TableId::new(rep),
3154                options,
3155                payload,
3156            },
3157        )
3158    }
3159
3160    /// Implements the `waitable-set.poll` intrinsic.
3161    pub(crate) fn waitable_set_poll(
3162        self,
3163        store: &mut StoreOpaque,
3164        options: OptionsIndex,
3165        set: u32,
3166        payload: u32,
3167    ) -> Result<u32> {
3168        let &CanonicalOptions {
3169            cancellable,
3170            instance: caller_instance,
3171            ..
3172        } = &self.id().get(store).component().env_component().options[options];
3173        let rep = store
3174            .instance_state(RuntimeInstance {
3175                instance: self.id().instance(),
3176                index: caller_instance,
3177            })
3178            .handle_table()
3179            .waitable_set_rep(set)?;
3180
3181        self.waitable_check(
3182            store,
3183            cancellable,
3184            WaitableCheck::Poll,
3185            WaitableCheckParams {
3186                set: TableId::new(rep),
3187                options,
3188                payload,
3189            },
3190        )
3191    }
3192
3193    /// Implements the `thread.index` intrinsic.
3194    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3195        let thread_id = store.concurrent_state_mut().guest_thread.unwrap().thread;
3196        // The unwrap is safe because `instance_rep` must be `Some` by this point
3197        Ok(store
3198            .concurrent_state_mut()
3199            .get_mut(thread_id)?
3200            .instance_rep
3201            .unwrap())
3202    }
3203
3204    /// Implements the `thread.new-indirect` intrinsic.
3205    pub(crate) fn thread_new_indirect<T: 'static>(
3206        self,
3207        mut store: StoreContextMut<T>,
3208        runtime_instance: RuntimeComponentInstanceIndex,
3209        _func_ty_idx: TypeFuncIndex, // currently unused
3210        start_func_table_idx: RuntimeTableIndex,
3211        start_func_idx: u32,
3212        context: i32,
3213    ) -> Result<u32> {
3214        log::trace!("creating new thread");
3215
3216        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3217        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3218        let callee = instance
3219            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3220            .ok_or_else(|| {
3221                format_err!("the start function index points to an uninitialized function")
3222            })?;
3223        if callee.type_index(store.0) != start_func_ty.type_index() {
3224            bail!(
3225                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3226            );
3227        }
3228
3229        let token = StoreToken::new(store.as_context_mut());
3230        let start_func = Box::new(
3231            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3232                let old_thread = store.set_thread(Some(guest_thread));
3233                log::trace!(
3234                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3235                );
3236
3237                store.maybe_push_call_context(guest_thread.task)?;
3238
3239                let mut store = token.as_context_mut(store);
3240                let mut params = [ValRaw::i32(context)];
3241                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3242                // on a separate fiber if we're running in an async store.
3243                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3244
3245                store.0.maybe_pop_call_context(guest_thread.task)?;
3246
3247                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3248                log::trace!("explicit thread {guest_thread:?} completed");
3249                let state = store.0.concurrent_state_mut();
3250                let task = state.get_mut(guest_thread.task)?;
3251                if task.threads.is_empty() && !task.returned_or_cancelled() {
3252                    bail!(Trap::NoAsyncResult);
3253                }
3254                store.0.set_thread(old_thread);
3255                let state = store.0.concurrent_state_mut();
3256                old_thread
3257                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3258                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3259                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3260                }
3261                log::trace!("thread start: restored {old_thread:?} as current thread");
3262
3263                Ok(())
3264            },
3265        );
3266
3267        let state = store.0.concurrent_state_mut();
3268        let current_thread = state.guest_thread.unwrap();
3269        let parent_task = current_thread.task;
3270
3271        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3272        let thread_id = state.push(new_thread)?;
3273        state.get_mut(parent_task)?.threads.insert(thread_id);
3274
3275        log::trace!("new thread with id {thread_id:?} created");
3276
3277        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3278    }
3279
3280    pub(crate) fn resume_suspended_thread(
3281        self,
3282        store: &mut StoreOpaque,
3283        runtime_instance: RuntimeComponentInstanceIndex,
3284        thread_idx: u32,
3285        high_priority: bool,
3286    ) -> Result<()> {
3287        let thread_id =
3288            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3289        let state = store.concurrent_state_mut();
3290        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3291        let thread = state.get_mut(guest_thread.thread)?;
3292
3293        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3294            GuestThreadState::NotStartedExplicit(start_func) => {
3295                log::trace!("starting thread {guest_thread:?}");
3296                let guest_call = WorkItem::GuestCall(GuestCall {
3297                    thread: guest_thread,
3298                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3299                        start_func(store, guest_thread)
3300                    })),
3301                });
3302                store
3303                    .concurrent_state_mut()
3304                    .push_work_item(guest_call, high_priority);
3305            }
3306            GuestThreadState::Suspended(fiber) => {
3307                log::trace!("resuming thread {thread_id:?} that was suspended");
3308                store
3309                    .concurrent_state_mut()
3310                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3311            }
3312            _ => {
3313                bail!("cannot resume thread which is not suspended");
3314            }
3315        }
3316        Ok(())
3317    }
3318
3319    fn add_guest_thread_to_instance_table(
3320        self,
3321        thread_id: TableId<GuestThread>,
3322        store: &mut StoreOpaque,
3323        runtime_instance: RuntimeComponentInstanceIndex,
3324    ) -> Result<u32> {
3325        let guest_id = store
3326            .instance_state(RuntimeInstance {
3327                instance: self.id().instance(),
3328                index: runtime_instance,
3329            })
3330            .thread_handle_table()
3331            .guest_thread_insert(thread_id.rep())?;
3332        store
3333            .concurrent_state_mut()
3334            .get_mut(thread_id)?
3335            .instance_rep = Some(guest_id);
3336        Ok(guest_id)
3337    }
3338
3339    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3340    /// and `thread.switch-to` intrinsics.
3341    pub(crate) fn suspension_intrinsic(
3342        self,
3343        store: &mut StoreOpaque,
3344        caller: RuntimeComponentInstanceIndex,
3345        cancellable: bool,
3346        yielding: bool,
3347        to_thread: Option<u32>,
3348    ) -> Result<WaitResult> {
3349        if to_thread.is_none() {
3350            let state = store.concurrent_state_mut();
3351            if yielding {
3352                // This is a `thread.yield` call
3353                if !state.may_block(state.guest_thread.unwrap().task) {
3354                    // The spec defines `thread.yield` to be a no-op in a
3355                    // non-blocking context, so we return immediately without giving
3356                    // any other thread a chance to run.
3357                    return Ok(WaitResult::Completed);
3358                }
3359            } else {
3360                // The caller may only call `thread.suspend` from an async task
3361                // (i.e. a task created via a call to an async export).
3362                // Otherwise, we'll trap.
3363                store.check_blocking()?;
3364            }
3365        }
3366
3367        // There could be a pending cancellation from a previous uncancellable wait
3368        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3369            return Ok(WaitResult::Cancelled);
3370        }
3371
3372        if let Some(thread) = to_thread {
3373            self.resume_suspended_thread(store, caller, thread, true)?;
3374        }
3375
3376        let state = store.concurrent_state_mut();
3377        let guest_thread = state.guest_thread.unwrap();
3378        let reason = if yielding {
3379            SuspendReason::Yielding {
3380                thread: guest_thread,
3381                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3382                // we're handling a `thread.yield-to` call; otherwise it would
3383                // panic if we called it in a non-blocking context.
3384                skip_may_block_check: to_thread.is_some(),
3385            }
3386        } else {
3387            SuspendReason::ExplicitlySuspending {
3388                thread: guest_thread,
3389                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3390                // we're handling a `thread.switch-to` call; otherwise it would
3391                // panic if we called it in a non-blocking context.
3392                skip_may_block_check: to_thread.is_some(),
3393            }
3394        };
3395
3396        store.suspend(reason)?;
3397
3398        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3399            Ok(WaitResult::Cancelled)
3400        } else {
3401            Ok(WaitResult::Completed)
3402        }
3403    }
3404
3405    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3406    fn waitable_check(
3407        self,
3408        store: &mut StoreOpaque,
3409        cancellable: bool,
3410        check: WaitableCheck,
3411        params: WaitableCheckParams,
3412    ) -> Result<u32> {
3413        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3414
3415        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3416
3417        let state = store.concurrent_state_mut();
3418        let task = state.get_mut(guest_thread.task)?;
3419
3420        // If we're waiting, and there are no events immediately available,
3421        // suspend the fiber until that changes.
3422        match &check {
3423            WaitableCheck::Wait => {
3424                let set = params.set;
3425
3426                if (task.event.is_none()
3427                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3428                    && state.get_mut(set)?.ready.is_empty()
3429                {
3430                    if cancellable {
3431                        let old = state
3432                            .get_mut(guest_thread.thread)?
3433                            .wake_on_cancel
3434                            .replace(set);
3435                        assert!(old.is_none());
3436                    }
3437
3438                    store.suspend(SuspendReason::Waiting {
3439                        set,
3440                        thread: guest_thread,
3441                        skip_may_block_check: false,
3442                    })?;
3443                }
3444            }
3445            WaitableCheck::Poll => {}
3446        }
3447
3448        log::trace!(
3449            "waitable check for {guest_thread:?}; set {:?}, part two",
3450            params.set
3451        );
3452
3453        // Deliver any pending events to the guest and return.
3454        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3455
3456        let (ordinal, handle, result) = match &check {
3457            WaitableCheck::Wait => {
3458                let (event, waitable) = event.unwrap();
3459                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3460                let (ordinal, result) = event.parts();
3461                (ordinal, handle, result)
3462            }
3463            WaitableCheck::Poll => {
3464                if let Some((event, waitable)) = event {
3465                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3466                    let (ordinal, result) = event.parts();
3467                    (ordinal, handle, result)
3468                } else {
3469                    log::trace!(
3470                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3471                        guest_thread.task,
3472                        params.set
3473                    );
3474                    let (ordinal, result) = Event::None.parts();
3475                    (ordinal, 0, result)
3476                }
3477            }
3478        };
3479        let memory = self.options_memory_mut(store, params.options);
3480        let ptr = func::validate_inbounds_dynamic(
3481            &CanonicalAbiInfo::POINTER_PAIR,
3482            memory,
3483            &ValRaw::u32(params.payload),
3484        )?;
3485        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3486        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3487        Ok(ordinal)
3488    }
3489
3490    /// Implements the `subtask.cancel` intrinsic.
3491    pub(crate) fn subtask_cancel(
3492        self,
3493        store: &mut StoreOpaque,
3494        caller_instance: RuntimeComponentInstanceIndex,
3495        async_: bool,
3496        task_id: u32,
3497    ) -> Result<u32> {
3498        if !async_ {
3499            // The caller may only sync call `subtask.cancel` from an async task
3500            // (i.e. a task created via a call to an async export).  Otherwise,
3501            // we'll trap.
3502            store.check_blocking()?;
3503        }
3504
3505        let (rep, is_host) = store
3506            .instance_state(RuntimeInstance {
3507                instance: self.id().instance(),
3508                index: caller_instance,
3509            })
3510            .handle_table()
3511            .subtask_rep(task_id)?;
3512        let (waitable, expected_caller_instance) = if is_host {
3513            let id = TableId::<HostTask>::new(rep);
3514            (
3515                Waitable::Host(id),
3516                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3517            )
3518        } else {
3519            let id = TableId::<GuestTask>::new(rep);
3520            if let &Caller::Guest { thread } = &store.concurrent_state_mut().get_mut(id)?.caller {
3521                (
3522                    Waitable::Guest(id),
3523                    store.concurrent_state_mut().get_mut(thread.task)?.instance,
3524                )
3525            } else {
3526                unreachable!()
3527            }
3528        };
3529        // Since waitables can neither be passed between instances nor forged,
3530        // this should never fail unless there's a bug in Wasmtime, but we check
3531        // here to be sure:
3532        assert_eq!(
3533            expected_caller_instance,
3534            RuntimeInstance {
3535                instance: self.id().instance(),
3536                index: caller_instance
3537            }
3538        );
3539
3540        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3541
3542        let concurrent_state = store.concurrent_state_mut();
3543        if let Waitable::Host(host_task) = waitable {
3544            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3545                handle.abort();
3546                return Ok(Status::ReturnCancelled as u32);
3547            }
3548        } else {
3549            let caller = concurrent_state.guest_thread.unwrap();
3550            let guest_task = TableId::<GuestTask>::new(rep);
3551            let task = concurrent_state.get_mut(guest_task)?;
3552            if !task.already_lowered_parameters() {
3553                // The task is in a `starting` state, meaning it hasn't run at
3554                // all yet.  Here we update its fields to indicate that it is
3555                // ready to delete immediately once `subtask.drop` is called.
3556                task.lower_params = None;
3557                task.lift_result = None;
3558                task.exited = true;
3559
3560                let instance = task.instance;
3561
3562                assert_eq!(1, task.threads.len());
3563                let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3564                let concurrent_state = store.concurrent_state_mut();
3565                concurrent_state.delete(thread)?;
3566                assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3567
3568                // Not yet started; cancel and remove from pending
3569                let pending = &mut store.instance_state(instance).concurrent_state().pending;
3570                let pending_count = pending.len();
3571                pending.retain(|thread, _| thread.task != guest_task);
3572                // If there were no pending threads for this task, we're in an error state
3573                if pending.len() == pending_count {
3574                    bail!("`subtask.cancel` called after terminal status delivered");
3575                }
3576                return Ok(Status::StartCancelled as u32);
3577            } else if !task.returned_or_cancelled() {
3578                // Started, but not yet returned or cancelled; send the
3579                // `CANCELLED` event
3580                task.cancel_sent = true;
3581                // Note that this might overwrite an event that was set earlier
3582                // (e.g. `Event::None` if the task is yielding, or
3583                // `Event::Cancelled` if it was already cancelled), but that's
3584                // okay -- this should supersede the previous state.
3585                task.event = Some(Event::Cancelled);
3586                for thread in task.threads.clone() {
3587                    let thread = QualifiedThreadId {
3588                        task: guest_task,
3589                        thread,
3590                    };
3591                    if let Some(set) = concurrent_state
3592                        .get_mut(thread.thread)
3593                        .unwrap()
3594                        .wake_on_cancel
3595                        .take()
3596                    {
3597                        let item = match concurrent_state
3598                            .get_mut(set)?
3599                            .waiting
3600                            .remove(&thread)
3601                            .unwrap()
3602                        {
3603                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3604                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3605                                thread,
3606                                kind: GuestCallKind::DeliverEvent {
3607                                    instance,
3608                                    set: None,
3609                                },
3610                            }),
3611                        };
3612                        concurrent_state.push_high_priority(item);
3613
3614                        store.suspend(SuspendReason::Yielding {
3615                            thread: caller,
3616                            // `subtask.cancel` is not allowed to be called in a
3617                            // sync context, so we cannot skip the may-block check.
3618                            skip_may_block_check: false,
3619                        })?;
3620                        break;
3621                    }
3622                }
3623
3624                let concurrent_state = store.concurrent_state_mut();
3625                let task = concurrent_state.get_mut(guest_task)?;
3626                if !task.returned_or_cancelled() {
3627                    if async_ {
3628                        return Ok(BLOCKED);
3629                    } else {
3630                        store.wait_for_event(Waitable::Guest(guest_task))?;
3631                    }
3632                }
3633            }
3634        }
3635
3636        let event = waitable.take_event(store.concurrent_state_mut())?;
3637        if let Some(Event::Subtask {
3638            status: status @ (Status::Returned | Status::ReturnCancelled),
3639        }) = event
3640        {
3641            Ok(status as u32)
3642        } else {
3643            bail!("`subtask.cancel` called after terminal status delivered");
3644        }
3645    }
3646
3647    pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3648        store.concurrent_state_mut().context_get(slot)
3649    }
3650
3651    pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3652        store.concurrent_state_mut().context_set(slot, value)
3653    }
3654}
3655
3656/// Trait representing component model ABI async intrinsics and fused adapter
3657/// helper functions.
3658///
3659/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3660/// which must be valid for at least the duration of the call (and possibly for
3661/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3662/// pointers used for async calls).
3663pub trait VMComponentAsyncStore {
3664    /// A helper function for fused adapter modules involving calls where the
3665    /// one of the caller or callee is async.
3666    ///
3667    /// This helper is not used when the caller and callee both use the sync
3668    /// ABI, only when at least one is async is this used.
3669    unsafe fn prepare_call(
3670        &mut self,
3671        instance: Instance,
3672        memory: *mut VMMemoryDefinition,
3673        start: *mut VMFuncRef,
3674        return_: *mut VMFuncRef,
3675        caller_instance: RuntimeComponentInstanceIndex,
3676        callee_instance: RuntimeComponentInstanceIndex,
3677        task_return_type: TypeTupleIndex,
3678        callee_async: bool,
3679        string_encoding: u8,
3680        result_count: u32,
3681        storage: *mut ValRaw,
3682        storage_len: usize,
3683    ) -> Result<()>;
3684
3685    /// A helper function for fused adapter modules involving calls where the
3686    /// caller is sync-lowered but the callee is async-lifted.
3687    unsafe fn sync_start(
3688        &mut self,
3689        instance: Instance,
3690        callback: *mut VMFuncRef,
3691        callee: *mut VMFuncRef,
3692        param_count: u32,
3693        storage: *mut MaybeUninit<ValRaw>,
3694        storage_len: usize,
3695    ) -> Result<()>;
3696
3697    /// A helper function for fused adapter modules involving calls where the
3698    /// caller is async-lowered.
3699    unsafe fn async_start(
3700        &mut self,
3701        instance: Instance,
3702        callback: *mut VMFuncRef,
3703        post_return: *mut VMFuncRef,
3704        callee: *mut VMFuncRef,
3705        param_count: u32,
3706        result_count: u32,
3707        flags: u32,
3708    ) -> Result<u32>;
3709
3710    /// The `future.write` intrinsic.
3711    fn future_write(
3712        &mut self,
3713        instance: Instance,
3714        caller: RuntimeComponentInstanceIndex,
3715        ty: TypeFutureTableIndex,
3716        options: OptionsIndex,
3717        future: u32,
3718        address: u32,
3719    ) -> Result<u32>;
3720
3721    /// The `future.read` intrinsic.
3722    fn future_read(
3723        &mut self,
3724        instance: Instance,
3725        caller: RuntimeComponentInstanceIndex,
3726        ty: TypeFutureTableIndex,
3727        options: OptionsIndex,
3728        future: u32,
3729        address: u32,
3730    ) -> Result<u32>;
3731
3732    /// The `future.drop-writable` intrinsic.
3733    fn future_drop_writable(
3734        &mut self,
3735        instance: Instance,
3736        ty: TypeFutureTableIndex,
3737        writer: u32,
3738    ) -> Result<()>;
3739
3740    /// The `stream.write` intrinsic.
3741    fn stream_write(
3742        &mut self,
3743        instance: Instance,
3744        caller: RuntimeComponentInstanceIndex,
3745        ty: TypeStreamTableIndex,
3746        options: OptionsIndex,
3747        stream: u32,
3748        address: u32,
3749        count: u32,
3750    ) -> Result<u32>;
3751
3752    /// The `stream.read` intrinsic.
3753    fn stream_read(
3754        &mut self,
3755        instance: Instance,
3756        caller: RuntimeComponentInstanceIndex,
3757        ty: TypeStreamTableIndex,
3758        options: OptionsIndex,
3759        stream: u32,
3760        address: u32,
3761        count: u32,
3762    ) -> Result<u32>;
3763
3764    /// The "fast-path" implementation of the `stream.write` intrinsic for
3765    /// "flat" (i.e. memcpy-able) payloads.
3766    fn flat_stream_write(
3767        &mut self,
3768        instance: Instance,
3769        caller: RuntimeComponentInstanceIndex,
3770        ty: TypeStreamTableIndex,
3771        options: OptionsIndex,
3772        payload_size: u32,
3773        payload_align: u32,
3774        stream: u32,
3775        address: u32,
3776        count: u32,
3777    ) -> Result<u32>;
3778
3779    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3780    /// (i.e. memcpy-able) payloads.
3781    fn flat_stream_read(
3782        &mut self,
3783        instance: Instance,
3784        caller: RuntimeComponentInstanceIndex,
3785        ty: TypeStreamTableIndex,
3786        options: OptionsIndex,
3787        payload_size: u32,
3788        payload_align: u32,
3789        stream: u32,
3790        address: u32,
3791        count: u32,
3792    ) -> Result<u32>;
3793
3794    /// The `stream.drop-writable` intrinsic.
3795    fn stream_drop_writable(
3796        &mut self,
3797        instance: Instance,
3798        ty: TypeStreamTableIndex,
3799        writer: u32,
3800    ) -> Result<()>;
3801
3802    /// The `error-context.debug-message` intrinsic.
3803    fn error_context_debug_message(
3804        &mut self,
3805        instance: Instance,
3806        ty: TypeComponentLocalErrorContextTableIndex,
3807        options: OptionsIndex,
3808        err_ctx_handle: u32,
3809        debug_msg_address: u32,
3810    ) -> Result<()>;
3811
3812    /// The `thread.new-indirect` intrinsic
3813    fn thread_new_indirect(
3814        &mut self,
3815        instance: Instance,
3816        caller: RuntimeComponentInstanceIndex,
3817        func_ty_idx: TypeFuncIndex,
3818        start_func_table_idx: RuntimeTableIndex,
3819        start_func_idx: u32,
3820        context: i32,
3821    ) -> Result<u32>;
3822}
3823
3824/// SAFETY: See trait docs.
3825impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3826    unsafe fn prepare_call(
3827        &mut self,
3828        instance: Instance,
3829        memory: *mut VMMemoryDefinition,
3830        start: *mut VMFuncRef,
3831        return_: *mut VMFuncRef,
3832        caller_instance: RuntimeComponentInstanceIndex,
3833        callee_instance: RuntimeComponentInstanceIndex,
3834        task_return_type: TypeTupleIndex,
3835        callee_async: bool,
3836        string_encoding: u8,
3837        result_count_or_max_if_async: u32,
3838        storage: *mut ValRaw,
3839        storage_len: usize,
3840    ) -> Result<()> {
3841        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3842        // this method will have ensured that `storage` is a valid
3843        // pointer containing at least `storage_len` items.
3844        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3845
3846        unsafe {
3847            instance.prepare_call(
3848                StoreContextMut(self),
3849                start,
3850                return_,
3851                caller_instance,
3852                callee_instance,
3853                task_return_type,
3854                callee_async,
3855                memory,
3856                string_encoding,
3857                match result_count_or_max_if_async {
3858                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3859                        params,
3860                        has_result: false,
3861                    },
3862                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3863                        params,
3864                        has_result: true,
3865                    },
3866                    result_count => CallerInfo::Sync {
3867                        params,
3868                        result_count,
3869                    },
3870                },
3871            )
3872        }
3873    }
3874
3875    unsafe fn sync_start(
3876        &mut self,
3877        instance: Instance,
3878        callback: *mut VMFuncRef,
3879        callee: *mut VMFuncRef,
3880        param_count: u32,
3881        storage: *mut MaybeUninit<ValRaw>,
3882        storage_len: usize,
3883    ) -> Result<()> {
3884        unsafe {
3885            instance
3886                .start_call(
3887                    StoreContextMut(self),
3888                    callback,
3889                    ptr::null_mut(),
3890                    callee,
3891                    param_count,
3892                    1,
3893                    START_FLAG_ASYNC_CALLEE,
3894                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3895                    // this method will have ensured that `storage` is a valid
3896                    // pointer containing at least `storage_len` items.
3897                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3898                )
3899                .map(drop)
3900        }
3901    }
3902
3903    unsafe fn async_start(
3904        &mut self,
3905        instance: Instance,
3906        callback: *mut VMFuncRef,
3907        post_return: *mut VMFuncRef,
3908        callee: *mut VMFuncRef,
3909        param_count: u32,
3910        result_count: u32,
3911        flags: u32,
3912    ) -> Result<u32> {
3913        unsafe {
3914            instance.start_call(
3915                StoreContextMut(self),
3916                callback,
3917                post_return,
3918                callee,
3919                param_count,
3920                result_count,
3921                flags,
3922                None,
3923            )
3924        }
3925    }
3926
3927    fn future_write(
3928        &mut self,
3929        instance: Instance,
3930        caller: RuntimeComponentInstanceIndex,
3931        ty: TypeFutureTableIndex,
3932        options: OptionsIndex,
3933        future: u32,
3934        address: u32,
3935    ) -> Result<u32> {
3936        instance
3937            .guest_write(
3938                StoreContextMut(self),
3939                caller,
3940                TransmitIndex::Future(ty),
3941                options,
3942                None,
3943                future,
3944                address,
3945                1,
3946            )
3947            .map(|result| result.encode())
3948    }
3949
3950    fn future_read(
3951        &mut self,
3952        instance: Instance,
3953        caller: RuntimeComponentInstanceIndex,
3954        ty: TypeFutureTableIndex,
3955        options: OptionsIndex,
3956        future: u32,
3957        address: u32,
3958    ) -> Result<u32> {
3959        instance
3960            .guest_read(
3961                StoreContextMut(self),
3962                caller,
3963                TransmitIndex::Future(ty),
3964                options,
3965                None,
3966                future,
3967                address,
3968                1,
3969            )
3970            .map(|result| result.encode())
3971    }
3972
3973    fn stream_write(
3974        &mut self,
3975        instance: Instance,
3976        caller: RuntimeComponentInstanceIndex,
3977        ty: TypeStreamTableIndex,
3978        options: OptionsIndex,
3979        stream: u32,
3980        address: u32,
3981        count: u32,
3982    ) -> Result<u32> {
3983        instance
3984            .guest_write(
3985                StoreContextMut(self),
3986                caller,
3987                TransmitIndex::Stream(ty),
3988                options,
3989                None,
3990                stream,
3991                address,
3992                count,
3993            )
3994            .map(|result| result.encode())
3995    }
3996
3997    fn stream_read(
3998        &mut self,
3999        instance: Instance,
4000        caller: RuntimeComponentInstanceIndex,
4001        ty: TypeStreamTableIndex,
4002        options: OptionsIndex,
4003        stream: u32,
4004        address: u32,
4005        count: u32,
4006    ) -> Result<u32> {
4007        instance
4008            .guest_read(
4009                StoreContextMut(self),
4010                caller,
4011                TransmitIndex::Stream(ty),
4012                options,
4013                None,
4014                stream,
4015                address,
4016                count,
4017            )
4018            .map(|result| result.encode())
4019    }
4020
4021    fn future_drop_writable(
4022        &mut self,
4023        instance: Instance,
4024        ty: TypeFutureTableIndex,
4025        writer: u32,
4026    ) -> Result<()> {
4027        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4028    }
4029
4030    fn flat_stream_write(
4031        &mut self,
4032        instance: Instance,
4033        caller: RuntimeComponentInstanceIndex,
4034        ty: TypeStreamTableIndex,
4035        options: OptionsIndex,
4036        payload_size: u32,
4037        payload_align: u32,
4038        stream: u32,
4039        address: u32,
4040        count: u32,
4041    ) -> Result<u32> {
4042        instance
4043            .guest_write(
4044                StoreContextMut(self),
4045                caller,
4046                TransmitIndex::Stream(ty),
4047                options,
4048                Some(FlatAbi {
4049                    size: payload_size,
4050                    align: payload_align,
4051                }),
4052                stream,
4053                address,
4054                count,
4055            )
4056            .map(|result| result.encode())
4057    }
4058
4059    fn flat_stream_read(
4060        &mut self,
4061        instance: Instance,
4062        caller: RuntimeComponentInstanceIndex,
4063        ty: TypeStreamTableIndex,
4064        options: OptionsIndex,
4065        payload_size: u32,
4066        payload_align: u32,
4067        stream: u32,
4068        address: u32,
4069        count: u32,
4070    ) -> Result<u32> {
4071        instance
4072            .guest_read(
4073                StoreContextMut(self),
4074                caller,
4075                TransmitIndex::Stream(ty),
4076                options,
4077                Some(FlatAbi {
4078                    size: payload_size,
4079                    align: payload_align,
4080                }),
4081                stream,
4082                address,
4083                count,
4084            )
4085            .map(|result| result.encode())
4086    }
4087
4088    fn stream_drop_writable(
4089        &mut self,
4090        instance: Instance,
4091        ty: TypeStreamTableIndex,
4092        writer: u32,
4093    ) -> Result<()> {
4094        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4095    }
4096
4097    fn error_context_debug_message(
4098        &mut self,
4099        instance: Instance,
4100        ty: TypeComponentLocalErrorContextTableIndex,
4101        options: OptionsIndex,
4102        err_ctx_handle: u32,
4103        debug_msg_address: u32,
4104    ) -> Result<()> {
4105        instance.error_context_debug_message(
4106            StoreContextMut(self),
4107            ty,
4108            options,
4109            err_ctx_handle,
4110            debug_msg_address,
4111        )
4112    }
4113
4114    fn thread_new_indirect(
4115        &mut self,
4116        instance: Instance,
4117        caller: RuntimeComponentInstanceIndex,
4118        func_ty_idx: TypeFuncIndex,
4119        start_func_table_idx: RuntimeTableIndex,
4120        start_func_idx: u32,
4121        context: i32,
4122    ) -> Result<u32> {
4123        instance.thread_new_indirect(
4124            StoreContextMut(self),
4125            caller,
4126            func_ty_idx,
4127            start_func_table_idx,
4128            start_func_idx,
4129            context,
4130        )
4131    }
4132}
4133
4134type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4135
4136/// Represents the state of a pending host task.
4137struct HostTask {
4138    common: WaitableCommon,
4139    caller_instance: RuntimeInstance,
4140    join_handle: Option<JoinHandle>,
4141}
4142
4143impl HostTask {
4144    fn new(caller_instance: RuntimeInstance, join_handle: Option<JoinHandle>) -> Self {
4145        Self {
4146            common: WaitableCommon::default(),
4147            caller_instance,
4148            join_handle,
4149        }
4150    }
4151}
4152
4153impl TableDebug for HostTask {
4154    fn type_name() -> &'static str {
4155        "HostTask"
4156    }
4157}
4158
4159type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4160
4161/// Represents the caller of a given guest task.
4162enum Caller {
4163    /// The host called the guest task.
4164    Host {
4165        /// If present, may be used to deliver the result.
4166        tx: Option<oneshot::Sender<LiftedResult>>,
4167        /// Channel to notify once all subtasks spawned by this caller have
4168        /// completed.
4169        ///
4170        /// Note that we'll never actually send anything to this channel;
4171        /// dropping it when the refcount goes to zero is sufficient to notify
4172        /// the receiver.
4173        exit_tx: Arc<oneshot::Sender<()>>,
4174        /// If true, there's a host future that must be dropped before the task
4175        /// can be deleted.
4176        host_future_present: bool,
4177        /// If `Some`, represents the `QualifiedThreadId` caller of the host
4178        /// function which called back into a guest.  Note that this thread
4179        /// could belong to an entirely unrelated top-level component instance
4180        /// than the one the host called into.
4181        caller: Option<QualifiedThreadId>,
4182    },
4183    /// Another guest thread called the guest task
4184    Guest {
4185        /// The id of the caller
4186        thread: QualifiedThreadId,
4187    },
4188}
4189
4190/// Represents a closure and related canonical ABI parameters required to
4191/// validate a `task.return` call at runtime and lift the result.
4192struct LiftResult {
4193    lift: RawLift,
4194    ty: TypeTupleIndex,
4195    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4196    string_encoding: StringEncoding,
4197}
4198
4199/// The table ID for a guest thread, qualified by the task to which it belongs.
4200///
4201/// This exists to minimize table lookups and the necessity to pass stores around mutably
4202/// for the common case of identifying the task to which a thread belongs.
4203#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4204struct QualifiedThreadId {
4205    task: TableId<GuestTask>,
4206    thread: TableId<GuestThread>,
4207}
4208
4209impl QualifiedThreadId {
4210    fn qualify(
4211        state: &mut ConcurrentState,
4212        thread: TableId<GuestThread>,
4213    ) -> Result<QualifiedThreadId> {
4214        Ok(QualifiedThreadId {
4215            task: state.get_mut(thread)?.parent_task,
4216            thread,
4217        })
4218    }
4219}
4220
4221impl fmt::Debug for QualifiedThreadId {
4222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4223        f.debug_tuple("QualifiedThreadId")
4224            .field(&self.task.rep())
4225            .field(&self.thread.rep())
4226            .finish()
4227    }
4228}
4229
4230enum GuestThreadState {
4231    NotStartedImplicit,
4232    NotStartedExplicit(
4233        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4234    ),
4235    Running,
4236    Suspended(StoreFiber<'static>),
4237    Pending,
4238    Completed,
4239}
4240pub struct GuestThread {
4241    /// Context-local state used to implement the `context.{get,set}`
4242    /// intrinsics.
4243    context: [u32; 2],
4244    /// The owning guest task.
4245    parent_task: TableId<GuestTask>,
4246    /// If present, indicates that the thread is currently waiting on the
4247    /// specified set but may be cancelled and woken immediately.
4248    wake_on_cancel: Option<TableId<WaitableSet>>,
4249    /// The execution state of this guest thread
4250    state: GuestThreadState,
4251    /// The index of this thread in the component instance's handle table.
4252    /// This must always be `Some` after initialization.
4253    instance_rep: Option<u32>,
4254}
4255
4256impl GuestThread {
4257    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4258    /// handle.
4259    fn from_instance(
4260        state: Pin<&mut ComponentInstance>,
4261        caller_instance: RuntimeComponentInstanceIndex,
4262        guest_thread: u32,
4263    ) -> Result<TableId<Self>> {
4264        let rep = state.instance_states().0[caller_instance]
4265            .thread_handle_table()
4266            .guest_thread_rep(guest_thread)?;
4267        Ok(TableId::new(rep))
4268    }
4269
4270    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4271        Self {
4272            context: [0; 2],
4273            parent_task,
4274            wake_on_cancel: None,
4275            state: GuestThreadState::NotStartedImplicit,
4276            instance_rep: None,
4277        }
4278    }
4279
4280    fn new_explicit(
4281        parent_task: TableId<GuestTask>,
4282        start_func: Box<
4283            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4284        >,
4285    ) -> Self {
4286        Self {
4287            context: [0; 2],
4288            parent_task,
4289            wake_on_cancel: None,
4290            state: GuestThreadState::NotStartedExplicit(start_func),
4291            instance_rep: None,
4292        }
4293    }
4294}
4295
4296impl TableDebug for GuestThread {
4297    fn type_name() -> &'static str {
4298        "GuestThread"
4299    }
4300}
4301
4302enum SyncResult {
4303    NotProduced,
4304    Produced(Option<ValRaw>),
4305    Taken,
4306}
4307
4308impl SyncResult {
4309    fn take(&mut self) -> Option<Option<ValRaw>> {
4310        match mem::replace(self, SyncResult::Taken) {
4311            SyncResult::NotProduced => None,
4312            SyncResult::Produced(val) => Some(val),
4313            SyncResult::Taken => {
4314                panic!("attempted to take a synchronous result that was already taken")
4315            }
4316        }
4317    }
4318}
4319
4320#[derive(Debug)]
4321enum HostFutureState {
4322    NotApplicable,
4323    Live,
4324    Dropped,
4325}
4326
4327/// Represents a pending guest task.
4328pub(crate) struct GuestTask {
4329    /// See `WaitableCommon`
4330    common: WaitableCommon,
4331    /// Closure to lower the parameters passed to this task.
4332    lower_params: Option<RawLower>,
4333    /// See `LiftResult`
4334    lift_result: Option<LiftResult>,
4335    /// A place to stash the type-erased lifted result if it can't be delivered
4336    /// immediately.
4337    result: Option<LiftedResult>,
4338    /// Closure to call the callback function for an async-lifted export, if
4339    /// provided.
4340    callback: Option<CallbackFn>,
4341    /// See `Caller`
4342    caller: Caller,
4343    /// A place to stash the call context for managing resource borrows while
4344    /// switching between guest tasks.
4345    call_context: Option<CallContext>,
4346    /// A place to stash the lowered result for a sync-to-async call until it
4347    /// can be returned to the caller.
4348    sync_result: SyncResult,
4349    /// Whether or not the task has been cancelled (i.e. whether the task is
4350    /// permitted to call `task.cancel`).
4351    cancel_sent: bool,
4352    /// Whether or not we've sent a `Status::Starting` event to any current or
4353    /// future waiters for this waitable.
4354    starting_sent: bool,
4355    /// Pending guest subtasks created by this task (directly or indirectly).
4356    ///
4357    /// This is used to re-parent subtasks which are still running when their
4358    /// parent task is disposed.
4359    subtasks: HashSet<TableId<GuestTask>>,
4360    /// Scratch waitable set used to watch subtasks during synchronous calls.
4361    sync_call_set: TableId<WaitableSet>,
4362    /// The runtime instance to which the exported function for this guest task
4363    /// belongs.
4364    ///
4365    /// Note that the task may do a sync->sync call via a fused adapter which
4366    /// results in that task executing code in a different instance, and it may
4367    /// call host functions and intrinsics from that other instance.
4368    instance: RuntimeInstance,
4369    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4370    /// delivered to this task.
4371    event: Option<Event>,
4372    /// The `ExportIndex` of the guest function being called, if known.
4373    function_index: Option<ExportIndex>,
4374    /// Whether or not the task has exited.
4375    exited: bool,
4376    /// Threads belonging to this task
4377    threads: HashSet<TableId<GuestThread>>,
4378    /// The state of the host future that represents an async task, which must
4379    /// be dropped before we can delete the task.
4380    host_future_state: HostFutureState,
4381    /// Indicates whether this task was created for a call to an async-lifted
4382    /// export.
4383    async_function: bool,
4384}
4385
4386impl GuestTask {
4387    fn already_lowered_parameters(&self) -> bool {
4388        // We reset `lower_params` after we lower the parameters
4389        self.lower_params.is_none()
4390    }
4391
4392    fn returned_or_cancelled(&self) -> bool {
4393        // We reset `lift_result` after we return or exit
4394        self.lift_result.is_none()
4395    }
4396
4397    fn ready_to_delete(&self) -> bool {
4398        let threads_completed = self.threads.is_empty();
4399        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4400        let pending_completion_event = matches!(
4401            self.common.event,
4402            Some(Event::Subtask {
4403                status: Status::Returned | Status::ReturnCancelled
4404            })
4405        );
4406        let ready = threads_completed
4407            && !has_sync_result
4408            && !pending_completion_event
4409            && !matches!(self.host_future_state, HostFutureState::Live);
4410        log::trace!(
4411            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4412            threads_completed,
4413            has_sync_result,
4414            pending_completion_event,
4415            self.host_future_state
4416        );
4417        ready
4418    }
4419
4420    fn new(
4421        state: &mut ConcurrentState,
4422        lower_params: RawLower,
4423        lift_result: LiftResult,
4424        caller: Caller,
4425        callback: Option<CallbackFn>,
4426        instance: RuntimeInstance,
4427        async_function: bool,
4428    ) -> Result<Self> {
4429        let sync_call_set = state.push(WaitableSet::default())?;
4430        let host_future_state = match &caller {
4431            Caller::Guest { .. } => HostFutureState::NotApplicable,
4432            Caller::Host {
4433                host_future_present,
4434                ..
4435            } => {
4436                if *host_future_present {
4437                    HostFutureState::Live
4438                } else {
4439                    HostFutureState::NotApplicable
4440                }
4441            }
4442        };
4443        Ok(Self {
4444            common: WaitableCommon::default(),
4445            lower_params: Some(lower_params),
4446            lift_result: Some(lift_result),
4447            result: None,
4448            callback,
4449            caller,
4450            call_context: Some(CallContext::default()),
4451            sync_result: SyncResult::NotProduced,
4452            cancel_sent: false,
4453            starting_sent: false,
4454            subtasks: HashSet::new(),
4455            sync_call_set,
4456            instance,
4457            event: None,
4458            function_index: None,
4459            exited: false,
4460            threads: HashSet::new(),
4461            host_future_state,
4462            async_function,
4463        })
4464    }
4465
4466    /// Dispose of this guest task, reparenting any pending subtasks to the
4467    /// caller.
4468    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4469        // If there are not-yet-delivered completion events for subtasks in
4470        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4471        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4472            if let Some(Event::Subtask {
4473                status: Status::Returned | Status::ReturnCancelled,
4474            }) = waitable.common(state)?.event
4475            {
4476                waitable.delete_from(state)?;
4477            }
4478        }
4479
4480        assert!(self.threads.is_empty());
4481
4482        state.delete(self.sync_call_set)?;
4483
4484        // Reparent any pending subtasks to the caller.
4485        match &self.caller {
4486            Caller::Guest { thread } => {
4487                let task_mut = state.get_mut(thread.task)?;
4488                let present = task_mut.subtasks.remove(&me);
4489                assert!(present);
4490
4491                for subtask in &self.subtasks {
4492                    task_mut.subtasks.insert(*subtask);
4493                }
4494
4495                for subtask in &self.subtasks {
4496                    state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4497                }
4498            }
4499            Caller::Host {
4500                exit_tx, caller, ..
4501            } => {
4502                for subtask in &self.subtasks {
4503                    state.get_mut(*subtask)?.caller = Caller::Host {
4504                        tx: None,
4505                        // Clone `exit_tx` to ensure that it is only dropped
4506                        // once all transitive subtasks of the host call have
4507                        // exited:
4508                        exit_tx: exit_tx.clone(),
4509                        host_future_present: false,
4510                        caller: *caller,
4511                    };
4512                }
4513            }
4514        }
4515
4516        for subtask in self.subtasks {
4517            let task = state.get_mut(subtask)?;
4518            if task.exited && task.ready_to_delete() {
4519                Waitable::Guest(subtask).delete_from(state)?;
4520            }
4521        }
4522
4523        Ok(())
4524    }
4525}
4526
4527impl TableDebug for GuestTask {
4528    fn type_name() -> &'static str {
4529        "GuestTask"
4530    }
4531}
4532
4533/// Represents state common to all kinds of waitables.
4534#[derive(Default)]
4535struct WaitableCommon {
4536    /// The currently pending event for this waitable, if any.
4537    event: Option<Event>,
4538    /// The set to which this waitable belongs, if any.
4539    set: Option<TableId<WaitableSet>>,
4540    /// The handle with which the guest refers to this waitable, if any.
4541    handle: Option<u32>,
4542}
4543
4544/// Represents a Component Model Async `waitable`.
4545#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4546enum Waitable {
4547    /// A host task
4548    Host(TableId<HostTask>),
4549    /// A guest task
4550    Guest(TableId<GuestTask>),
4551    /// The read or write end of a stream or future
4552    Transmit(TableId<TransmitHandle>),
4553}
4554
4555impl Waitable {
4556    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4557    /// handle.
4558    fn from_instance(
4559        state: Pin<&mut ComponentInstance>,
4560        caller_instance: RuntimeComponentInstanceIndex,
4561        waitable: u32,
4562    ) -> Result<Self> {
4563        use crate::runtime::vm::component::Waitable;
4564
4565        let (waitable, kind) = state.instance_states().0[caller_instance]
4566            .handle_table()
4567            .waitable_rep(waitable)?;
4568
4569        Ok(match kind {
4570            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4571            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4572            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4573        })
4574    }
4575
4576    /// Retrieve the host-visible identifier for this `Waitable`.
4577    fn rep(&self) -> u32 {
4578        match self {
4579            Self::Host(id) => id.rep(),
4580            Self::Guest(id) => id.rep(),
4581            Self::Transmit(id) => id.rep(),
4582        }
4583    }
4584
4585    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4586    /// remove it from any set it may currently belong to (when `set` is
4587    /// `None`).
4588    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4589        log::trace!("waitable {self:?} join set {set:?}",);
4590
4591        let old = mem::replace(&mut self.common(state)?.set, set);
4592
4593        if let Some(old) = old {
4594            match *self {
4595                Waitable::Host(id) => state.remove_child(id, old),
4596                Waitable::Guest(id) => state.remove_child(id, old),
4597                Waitable::Transmit(id) => state.remove_child(id, old),
4598            }?;
4599
4600            state.get_mut(old)?.ready.remove(self);
4601        }
4602
4603        if let Some(set) = set {
4604            match *self {
4605                Waitable::Host(id) => state.add_child(id, set),
4606                Waitable::Guest(id) => state.add_child(id, set),
4607                Waitable::Transmit(id) => state.add_child(id, set),
4608            }?;
4609
4610            if self.common(state)?.event.is_some() {
4611                self.mark_ready(state)?;
4612            }
4613        }
4614
4615        Ok(())
4616    }
4617
4618    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4619    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4620        Ok(match self {
4621            Self::Host(id) => &mut state.get_mut(*id)?.common,
4622            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4623            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4624        })
4625    }
4626
4627    /// Set or clear the pending event for this waitable and either deliver it
4628    /// to the first waiter, if any, or mark it as ready to be delivered to the
4629    /// next waiter that arrives.
4630    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4631        log::trace!("set event for {self:?}: {event:?}");
4632        self.common(state)?.event = event;
4633        self.mark_ready(state)
4634    }
4635
4636    /// Take the pending event from this waitable, leaving `None` in its place.
4637    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4638        let common = self.common(state)?;
4639        let event = common.event.take();
4640        if let Some(set) = self.common(state)?.set {
4641            state.get_mut(set)?.ready.remove(self);
4642        }
4643
4644        Ok(event)
4645    }
4646
4647    /// Deliver the current event for this waitable to the first waiter, if any,
4648    /// or else mark it as ready to be delivered to the next waiter that
4649    /// arrives.
4650    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4651        if let Some(set) = self.common(state)?.set {
4652            state.get_mut(set)?.ready.insert(*self);
4653            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4654                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4655                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4656
4657                let item = match mode {
4658                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4659                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4660                        thread,
4661                        kind: GuestCallKind::DeliverEvent {
4662                            instance,
4663                            set: Some(set),
4664                        },
4665                    }),
4666                };
4667                state.push_high_priority(item);
4668            }
4669        }
4670        Ok(())
4671    }
4672
4673    /// Remove this waitable from the instance's rep table.
4674    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4675        match self {
4676            Self::Host(task) => {
4677                log::trace!("delete host task {task:?}");
4678                state.delete(*task)?;
4679            }
4680            Self::Guest(task) => {
4681                log::trace!("delete guest task {task:?}");
4682                state.delete(*task)?.dispose(state, *task)?;
4683            }
4684            Self::Transmit(task) => {
4685                state.delete(*task)?;
4686            }
4687        }
4688
4689        Ok(())
4690    }
4691}
4692
4693impl fmt::Debug for Waitable {
4694    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4695        match self {
4696            Self::Host(id) => write!(f, "{id:?}"),
4697            Self::Guest(id) => write!(f, "{id:?}"),
4698            Self::Transmit(id) => write!(f, "{id:?}"),
4699        }
4700    }
4701}
4702
4703/// Represents a Component Model Async `waitable-set`.
4704#[derive(Default)]
4705struct WaitableSet {
4706    /// Which waitables in this set have pending events, if any.
4707    ready: BTreeSet<Waitable>,
4708    /// Which guest threads are currently waiting on this set, if any.
4709    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4710}
4711
4712impl TableDebug for WaitableSet {
4713    fn type_name() -> &'static str {
4714        "WaitableSet"
4715    }
4716}
4717
4718/// Type-erased closure to lower the parameters for a guest task.
4719type RawLower =
4720    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4721
4722/// Type-erased closure to lift the result for a guest task.
4723type RawLift = Box<
4724    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4725>;
4726
4727/// Type erased result of a guest task which may be downcast to the expected
4728/// type by a host caller (or simply ignored in the case of a guest caller; see
4729/// `DummyResult`).
4730type LiftedResult = Box<dyn Any + Send + Sync>;
4731
4732/// Used to return a result from a `LiftFn` when the actual result has already
4733/// been lowered to a guest task's stack and linear memory.
4734struct DummyResult;
4735
4736/// Represents the Component Model Async state of a (sub-)component instance.
4737#[derive(Default)]
4738pub struct ConcurrentInstanceState {
4739    /// Whether backpressure is set for this instance (enabled if >0)
4740    backpressure: u16,
4741    /// Whether this instance can be entered
4742    do_not_enter: bool,
4743    /// Pending calls for this instance which require `Self::backpressure` to be
4744    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4745    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4746}
4747
4748impl ConcurrentInstanceState {
4749    pub fn pending_is_empty(&self) -> bool {
4750        self.pending.is_empty()
4751    }
4752}
4753
4754/// Represents the Component Model Async state of a store.
4755pub struct ConcurrentState {
4756    /// The currently running guest thread, if any.
4757    guest_thread: Option<QualifiedThreadId>,
4758
4759    /// The set of pending host and background tasks, if any.
4760    ///
4761    /// See `ComponentInstance::poll_until` for where we temporarily take this
4762    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4763    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4764    /// The table of waitables, waitable sets, etc.
4765    table: AlwaysMut<ResourceTable>,
4766    /// The "high priority" work queue for this store's event loop.
4767    high_priority: Vec<WorkItem>,
4768    /// The "low priority" work queue for this store's event loop.
4769    low_priority: VecDeque<WorkItem>,
4770    /// A place to stash the reason a fiber is suspending so that the code which
4771    /// resumed it will know under what conditions the fiber should be resumed
4772    /// again.
4773    suspend_reason: Option<SuspendReason>,
4774    /// A cached fiber which is waiting for work to do.
4775    ///
4776    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4777    worker: Option<StoreFiber<'static>>,
4778    /// A place to stash the work item for which we're resuming a worker fiber.
4779    worker_item: Option<WorkerItem>,
4780
4781    /// Reference counts for all component error contexts
4782    ///
4783    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4784    /// (sub)component ref counts as tracked by `error_context_tables`, for
4785    /// example when the host holds one or more references to error contexts.
4786    ///
4787    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4788    /// component-wide representation) of the index into concurrent state for a given
4789    /// stored `ErrorContext`.
4790    ///
4791    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4792    /// as a `TableId<ErrorContextState>`.
4793    global_error_context_ref_counts:
4794        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4795}
4796
4797impl Default for ConcurrentState {
4798    fn default() -> Self {
4799        Self {
4800            guest_thread: None,
4801            table: AlwaysMut::new(ResourceTable::new()),
4802            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4803            high_priority: Vec::new(),
4804            low_priority: VecDeque::new(),
4805            suspend_reason: None,
4806            worker: None,
4807            worker_item: None,
4808            global_error_context_ref_counts: BTreeMap::new(),
4809        }
4810    }
4811}
4812
4813impl ConcurrentState {
4814    /// Take ownership of any fibers and futures owned by this object.
4815    ///
4816    /// This should be used when disposing of the `Store` containing this object
4817    /// in order to gracefully resolve any and all fibers using
4818    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4819    /// use-after-free bugs due to fibers which may still have access to the
4820    /// `Store`.
4821    ///
4822    /// Additionally, the futures collected with this function should be dropped
4823    /// within a `tls::set` call, which will ensure than any futures closing
4824    /// over an `&Accessor` will have access to the store when dropped, allowing
4825    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4826    /// panicking.
4827    ///
4828    /// Note that this will leave the object in an inconsistent and unusable
4829    /// state, so it should only be used just prior to dropping it.
4830    pub(crate) fn take_fibers_and_futures(
4831        &mut self,
4832        fibers: &mut Vec<StoreFiber<'static>>,
4833        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4834    ) {
4835        for entry in self.table.get_mut().iter_mut() {
4836            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4837                for mode in mem::take(&mut set.waiting).into_values() {
4838                    if let WaitMode::Fiber(fiber) = mode {
4839                        fibers.push(fiber);
4840                    }
4841                }
4842            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4843                if let GuestThreadState::Suspended(fiber) =
4844                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4845                {
4846                    fibers.push(fiber);
4847                }
4848            }
4849        }
4850
4851        if let Some(fiber) = self.worker.take() {
4852            fibers.push(fiber);
4853        }
4854
4855        let mut handle_item = |item| match item {
4856            WorkItem::ResumeFiber(fiber) => {
4857                fibers.push(fiber);
4858            }
4859            WorkItem::PushFuture(future) => {
4860                self.futures
4861                    .get_mut()
4862                    .as_mut()
4863                    .unwrap()
4864                    .push(future.into_inner());
4865            }
4866            _ => {}
4867        };
4868
4869        for item in mem::take(&mut self.high_priority) {
4870            handle_item(item);
4871        }
4872        for item in mem::take(&mut self.low_priority) {
4873            handle_item(item);
4874        }
4875
4876        if let Some(them) = self.futures.get_mut().take() {
4877            futures.push(them);
4878        }
4879    }
4880
4881    /// Collect the next set of work items to run. This will be either all
4882    /// high-priority items, or a single low-priority item if there are no
4883    /// high-priority items.
4884    fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4885        let mut ready = mem::take(&mut self.high_priority);
4886        if ready.is_empty() {
4887            if let Some(item) = self.low_priority.pop_back() {
4888                ready.push(item);
4889            }
4890        }
4891        ready
4892    }
4893
4894    fn push<V: Send + Sync + 'static>(
4895        &mut self,
4896        value: V,
4897    ) -> Result<TableId<V>, ResourceTableError> {
4898        self.table.get_mut().push(value).map(TableId::from)
4899    }
4900
4901    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4902        self.table.get_mut().get_mut(&Resource::from(id))
4903    }
4904
4905    pub fn add_child<T: 'static, U: 'static>(
4906        &mut self,
4907        child: TableId<T>,
4908        parent: TableId<U>,
4909    ) -> Result<(), ResourceTableError> {
4910        self.table
4911            .get_mut()
4912            .add_child(Resource::from(child), Resource::from(parent))
4913    }
4914
4915    pub fn remove_child<T: 'static, U: 'static>(
4916        &mut self,
4917        child: TableId<T>,
4918        parent: TableId<U>,
4919    ) -> Result<(), ResourceTableError> {
4920        self.table
4921            .get_mut()
4922            .remove_child(Resource::from(child), Resource::from(parent))
4923    }
4924
4925    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4926        self.table.get_mut().delete(Resource::from(id))
4927    }
4928
4929    fn push_future(&mut self, future: HostTaskFuture) {
4930        // Note that we can't directly push to `ConcurrentState::futures` here
4931        // since this may be called from a future that's being polled inside
4932        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4933        // so it has exclusive access while polling it.  Therefore, we push a
4934        // work item to the "high priority" queue, which will actually push to
4935        // `ConcurrentState::futures` later.
4936        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4937    }
4938
4939    fn push_high_priority(&mut self, item: WorkItem) {
4940        log::trace!("push high priority: {item:?}");
4941        self.high_priority.push(item);
4942    }
4943
4944    fn push_low_priority(&mut self, item: WorkItem) {
4945        log::trace!("push low priority: {item:?}");
4946        self.low_priority.push_front(item);
4947    }
4948
4949    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4950        if high_priority {
4951            self.push_high_priority(item);
4952        } else {
4953            self.push_low_priority(item);
4954        }
4955    }
4956
4957    /// Implements the `context.get` intrinsic.
4958    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4959        let thread = self.guest_thread.unwrap();
4960        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4961        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4962        Ok(val)
4963    }
4964
4965    /// Implements the `context.set` intrinsic.
4966    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4967        let thread = self.guest_thread.unwrap();
4968        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4969        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4970        Ok(())
4971    }
4972
4973    /// Returns whether there's a pending cancellation on the current guest thread,
4974    /// consuming the event if so.
4975    fn take_pending_cancellation(&mut self) -> bool {
4976        let thread = self.guest_thread.unwrap();
4977        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4978            assert!(matches!(event, Event::Cancelled));
4979            true
4980        } else {
4981            false
4982        }
4983    }
4984
4985    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4986        if self.may_block(task) {
4987            Ok(())
4988        } else {
4989            Err(Trap::CannotBlockSyncTask.into())
4990        }
4991    }
4992
4993    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4994        let task = self.get_mut(task).unwrap();
4995        task.async_function || task.returned_or_cancelled()
4996    }
4997}
4998
4999/// Provide a type hint to compiler about the shape of a parameter lower
5000/// closure.
5001fn for_any_lower<
5002    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5003>(
5004    fun: F,
5005) -> F {
5006    fun
5007}
5008
5009/// Provide a type hint to compiler about the shape of a result lift closure.
5010fn for_any_lift<
5011    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5012>(
5013    fun: F,
5014) -> F {
5015    fun
5016}
5017
5018/// Wrap the specified future in a `poll_fn` which asserts that the future is
5019/// only polled from the event loop of the specified `Store`.
5020///
5021/// See `StoreContextMut::run_concurrent` for details.
5022fn checked<F: Future + Send + 'static>(
5023    id: StoreId,
5024    fut: F,
5025) -> impl Future<Output = F::Output> + Send + 'static {
5026    async move {
5027        let mut fut = pin!(fut);
5028        future::poll_fn(move |cx| {
5029            let message = "\
5030                `Future`s which depend on asynchronous component tasks, streams, or \
5031                futures to complete may only be polled from the event loop of the \
5032                store to which they belong.  Please use \
5033                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5034            ";
5035            tls::try_get(|store| {
5036                let matched = match store {
5037                    tls::TryGet::Some(store) => store.id() == id,
5038                    tls::TryGet::Taken | tls::TryGet::None => false,
5039                };
5040
5041                if !matched {
5042                    panic!("{message}")
5043                }
5044            });
5045            fut.as_mut().poll(cx)
5046        })
5047        .await
5048    }
5049}
5050
5051/// Assert that `StoreContextMut::run_concurrent` has not been called from
5052/// within an store's event loop.
5053fn check_recursive_run() {
5054    tls::try_get(|store| {
5055        if !matches!(store, tls::TryGet::None) {
5056            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5057        }
5058    });
5059}
5060
5061fn unpack_callback_code(code: u32) -> (u32, u32) {
5062    (code & 0xF, code >> 4)
5063}
5064
5065/// Helper struct for packaging parameters to be passed to
5066/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5067/// `waitable-set.poll`.
5068struct WaitableCheckParams {
5069    set: TableId<WaitableSet>,
5070    options: OptionsIndex,
5071    payload: u32,
5072}
5073
5074/// Indicates whether `ComponentInstance::waitable_check` is being called for
5075/// `waitable-set.wait` or `waitable-set.poll`.
5076enum WaitableCheck {
5077    Wait,
5078    Poll,
5079}
5080
5081/// Represents a guest task called from the host, prepared using `prepare_call`.
5082pub(crate) struct PreparedCall<R> {
5083    /// The guest export to be called
5084    handle: Func,
5085    /// The guest thread created by `prepare_call`
5086    thread: QualifiedThreadId,
5087    /// The number of lowered core Wasm parameters to pass to the call.
5088    param_count: usize,
5089    /// The `oneshot::Receiver` to which the result of the call will be
5090    /// delivered when it is available.
5091    rx: oneshot::Receiver<LiftedResult>,
5092    /// The `oneshot::Receiver` which will resolve when the task -- and any
5093    /// transitive subtasks -- have all exited.
5094    exit_rx: oneshot::Receiver<()>,
5095    _phantom: PhantomData<R>,
5096}
5097
5098impl<R> PreparedCall<R> {
5099    /// Get a copy of the `TaskId` for this `PreparedCall`.
5100    pub(crate) fn task_id(&self) -> TaskId {
5101        TaskId {
5102            task: self.thread.task,
5103        }
5104    }
5105}
5106
5107/// Represents a task created by `prepare_call`.
5108pub(crate) struct TaskId {
5109    task: TableId<GuestTask>,
5110}
5111
5112impl TaskId {
5113    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5114    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5115    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5116    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5117    /// and delete the task when all threads are done.
5118    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5119        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5120        if !task.already_lowered_parameters() {
5121            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5122        } else {
5123            task.host_future_state = HostFutureState::Dropped;
5124            if task.ready_to_delete() {
5125                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5126            }
5127        }
5128        Ok(())
5129    }
5130}
5131
5132/// Prepare a call to the specified exported Wasm function, providing functions
5133/// for lowering the parameters and lifting the result.
5134///
5135/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5136/// loop, use `queue_call`.
5137pub(crate) fn prepare_call<T, R>(
5138    mut store: StoreContextMut<T>,
5139    handle: Func,
5140    param_count: usize,
5141    host_future_present: bool,
5142    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5143    + Send
5144    + Sync
5145    + 'static,
5146    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5147    + Send
5148    + Sync
5149    + 'static,
5150) -> Result<PreparedCall<R>> {
5151    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5152
5153    let instance = handle.instance().id().get(store.0);
5154    let options = &instance.component().env_component().options[options];
5155    let ty = &instance.component().types()[ty];
5156    let async_function = ty.async_;
5157    let task_return_type = ty.results;
5158    let component_instance = raw_options.instance;
5159    let callback = options.callback.map(|i| instance.runtime_callback(i));
5160    let memory = options
5161        .memory()
5162        .map(|i| instance.runtime_memory(i))
5163        .map(SendSyncPtr::new);
5164    let string_encoding = options.string_encoding;
5165    let token = StoreToken::new(store.as_context_mut());
5166    let state = store.0.concurrent_state_mut();
5167
5168    let (tx, rx) = oneshot::channel();
5169    let (exit_tx, exit_rx) = oneshot::channel();
5170
5171    let caller = state.guest_thread;
5172    let mut task = GuestTask::new(
5173        state,
5174        Box::new(for_any_lower(move |store, params| {
5175            lower_params(handle, token.as_context_mut(store), params)
5176        })),
5177        LiftResult {
5178            lift: Box::new(for_any_lift(move |store, result| {
5179                lift_result(handle, store, result)
5180            })),
5181            ty: task_return_type,
5182            memory,
5183            string_encoding,
5184        },
5185        Caller::Host {
5186            tx: Some(tx),
5187            exit_tx: Arc::new(exit_tx),
5188            host_future_present,
5189            caller,
5190        },
5191        callback.map(|callback| {
5192            let callback = SendSyncPtr::new(callback);
5193            let instance = handle.instance();
5194            Box::new(move |store: &mut dyn VMStore, event, handle| {
5195                let store = token.as_context_mut(store);
5196                // SAFETY: Per the contract of `prepare_call`, the callback
5197                // will remain valid at least as long is this task exists.
5198                unsafe { instance.call_callback(store, callback, event, handle) }
5199            }) as CallbackFn
5200        }),
5201        RuntimeInstance {
5202            instance: handle.instance().id().instance(),
5203            index: component_instance,
5204        },
5205        async_function,
5206    )?;
5207    task.function_index = Some(handle.index());
5208
5209    let task = state.push(task)?;
5210    let thread = state.push(GuestThread::new_implicit(task))?;
5211    state.get_mut(task)?.threads.insert(thread);
5212
5213    if !store.0.may_enter_task(task) {
5214        bail!(crate::Trap::CannotEnterComponent);
5215    }
5216
5217    Ok(PreparedCall {
5218        handle,
5219        thread: QualifiedThreadId { task, thread },
5220        param_count,
5221        rx,
5222        exit_rx,
5223        _phantom: PhantomData,
5224    })
5225}
5226
5227/// Queue a call previously prepared using `prepare_call` to be run as part of
5228/// the associated `ComponentInstance`'s event loop.
5229///
5230/// The returned future will resolve to the result once it is available, but
5231/// must only be polled via the instance's event loop. See
5232/// `StoreContextMut::run_concurrent` for details.
5233pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5234    mut store: StoreContextMut<T>,
5235    prepared: PreparedCall<R>,
5236) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5237    let PreparedCall {
5238        handle,
5239        thread,
5240        param_count,
5241        rx,
5242        exit_rx,
5243        ..
5244    } = prepared;
5245
5246    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5247
5248    Ok(checked(
5249        store.0.id(),
5250        rx.map(move |result| {
5251            result
5252                .map(|v| (*v.downcast().unwrap(), exit_rx))
5253                .map_err(crate::Error::from)
5254        }),
5255    ))
5256}
5257
5258/// Queue a call previously prepared using `prepare_call` to be run as part of
5259/// the associated `ComponentInstance`'s event loop.
5260fn queue_call0<T: 'static>(
5261    store: StoreContextMut<T>,
5262    handle: Func,
5263    guest_thread: QualifiedThreadId,
5264    param_count: usize,
5265) -> Result<()> {
5266    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5267    let is_concurrent = raw_options.async_;
5268    let callback = raw_options.callback;
5269    let instance = handle.instance();
5270    let callee = handle.lifted_core_func(store.0);
5271    let post_return = handle.post_return_core_func(store.0);
5272    let callback = callback.map(|i| {
5273        let instance = instance.id().get(store.0);
5274        SendSyncPtr::new(instance.runtime_callback(i))
5275    });
5276
5277    log::trace!("queueing call {guest_thread:?}");
5278
5279    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5280    // (with signatures appropriate for this call) and will remain valid as
5281    // long as this instance is valid.
5282    unsafe {
5283        instance.queue_call(
5284            store,
5285            guest_thread,
5286            SendSyncPtr::new(callee),
5287            param_count,
5288            1,
5289            is_concurrent,
5290            callback,
5291            post_return.map(SendSyncPtr::new),
5292        )
5293    }
5294}