Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64    bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::sync::Arc;
84use std::task::{Context, Poll, Waker};
85use std::vec::Vec;
86use table::{TableDebug, TableId};
87use wasmtime_environ::Trap;
88use wasmtime_environ::component::{
89    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
90    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
91    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
92    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
93    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
94};
95use wasmtime_environ::packed_option::ReservedValue;
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113/// Constant defined in the Component Model spec to indicate that the async
114/// intrinsic (e.g. `future.write`) has not yet completed.
115const BLOCKED: u32 = 0xffff_ffff;
116
117/// Corresponds to `CallState` in the upstream spec.
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120    Starting = 0,
121    Started = 1,
122    Returned = 2,
123    StartCancelled = 3,
124    ReturnCancelled = 4,
125}
126
127impl Status {
128    /// Packs this status and the optional `waitable` provided into a 32-bit
129    /// result that the canonical ABI requires.
130    ///
131    /// The low 4 bits are reserved for the status while the upper 28 bits are
132    /// the waitable, if present.
133    pub fn pack(self, waitable: Option<u32>) -> u32 {
134        assert!(matches!(self, Status::Returned) == waitable.is_none());
135        let waitable = waitable.unwrap_or(0);
136        assert!(waitable < (1 << 28));
137        (waitable << 4) | (self as u32)
138    }
139}
140
141/// Corresponds to `EventCode` in the Component Model spec, plus related payload
142/// data.
143#[derive(Clone, Copy, Debug)]
144enum Event {
145    None,
146    Cancelled,
147    Subtask {
148        status: Status,
149    },
150    StreamRead {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    StreamWrite {
155        code: ReturnCode,
156        pending: Option<(TypeStreamTableIndex, u32)>,
157    },
158    FutureRead {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162    FutureWrite {
163        code: ReturnCode,
164        pending: Option<(TypeFutureTableIndex, u32)>,
165    },
166}
167
168impl Event {
169    /// Lower this event to core Wasm integers for delivery to the guest.
170    ///
171    /// Note that the waitable handle, if any, is assumed to be lowered
172    /// separately.
173    fn parts(self) -> (u32, u32) {
174        const EVENT_NONE: u32 = 0;
175        const EVENT_SUBTASK: u32 = 1;
176        const EVENT_STREAM_READ: u32 = 2;
177        const EVENT_STREAM_WRITE: u32 = 3;
178        const EVENT_FUTURE_READ: u32 = 4;
179        const EVENT_FUTURE_WRITE: u32 = 5;
180        const EVENT_CANCELLED: u32 = 6;
181        match self {
182            Event::None => (EVENT_NONE, 0),
183            Event::Cancelled => (EVENT_CANCELLED, 0),
184            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189        }
190    }
191}
192
193/// Corresponds to `CallbackCode` in the spec.
194mod callback_code {
195    pub const EXIT: u32 = 0;
196    pub const YIELD: u32 = 1;
197    pub const WAIT: u32 = 2;
198}
199
200/// A flag indicating that the callee is an async-lowered export.
201///
202/// This may be passed to the `async-start` intrinsic from a fused adapter.
203const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205/// Provides access to either store data (via the `get` method) or the store
206/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207/// instance to which the current host task belongs.
208///
209/// See [`Accessor::with`] for details.
210pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211    store: StoreContextMut<'a, T>,
212    get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217    D: HasData + ?Sized,
218    T: 'static,
219{
220    /// Creates a new [`Access`] from its component parts.
221    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222        Self { store, get_data }
223    }
224
225    /// Get mutable access to the store data.
226    pub fn data_mut(&mut self) -> &mut T {
227        self.store.data_mut()
228    }
229
230    /// Get mutable access to the store data.
231    pub fn get(&mut self) -> D::Data<'_> {
232        (self.get_data)(self.data_mut())
233    }
234
235    /// Spawn a background task.
236    ///
237    /// See [`Accessor::spawn`] for details.
238    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239    where
240        T: 'static,
241    {
242        let accessor = Accessor {
243            get_data: self.get_data,
244            token: StoreToken::new(self.store.as_context_mut()),
245        };
246        self.store
247            .as_context_mut()
248            .spawn_with_accessor(accessor, task)
249    }
250
251    /// Returns the getter this accessor is using to project from `T` into
252    /// `D::Data`.
253    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254        self.get_data
255    }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260    D: HasData + ?Sized,
261    T: 'static,
262{
263    type Data = T;
264
265    fn as_context(&self) -> StoreContext<'_, T> {
266        self.store.as_context()
267    }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272    D: HasData + ?Sized,
273    T: 'static,
274{
275    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276        self.store.as_context_mut()
277    }
278}
279
280/// Provides scoped mutable access to store data in the context of a concurrent
281/// host task future.
282///
283/// This allows multiple host task futures to execute concurrently and access
284/// the store between (but not across) `await` points.
285///
286/// # Rationale
287///
288/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289/// `D::Data<'_>`. The problem this is solving, however, is that it does not
290/// literally store these values. The basic problem is that when a concurrent
291/// host future is being polled it has access to `&mut T` (and the whole
292/// `Store`) but when it's not being polled it does not have access to these
293/// values. This reflects how the store is only ever polling one future at a
294/// time so the store is effectively being passed between futures.
295///
296/// Rust's `Future` trait, however, has no means of passing a `Store`
297/// temporarily between futures. The [`Context`](std::task::Context) type does
298/// not have the ability to attach arbitrary information to it at this time.
299/// This type, [`Accessor`], is used to bridge this expressivity gap.
300///
301/// The [`Accessor`] type here represents the ability to acquire, temporarily in
302/// a synchronous manner, the current store. The [`Accessor::with`] function
303/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305/// not take an `async` closure as its argument, instead it's a synchronous
306/// closure which must complete during on run of `Future::poll`. This reflects
307/// how the store is temporarily made available while a host future is being
308/// polled.
309///
310/// # Implementation
311///
312/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313/// this type additionally doesn't even have a lifetime parameter. This is
314/// instead a representation of proof of the ability to acquire these while a
315/// future is being polled. Wasmtime will, when it polls a host future,
316/// configure ambient state such that the `Accessor` that a future closes over
317/// will work and be able to access the store.
318///
319/// This has a number of implications for users such as:
320///
321/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322///   the lifetime of a single future.
323/// * A future is expected to, however, close over an `Accessor` and keep it
324///   alive probably for the duration of the entire future.
325/// * Different host futures will be given different `Accessor`s, and that's
326///   intentional.
327/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328///   alleviates some otherwise required bounds to be written down.
329///
330/// # Using `Accessor` in `Drop`
331///
332/// The methods on `Accessor` are only expected to work in the context of
333/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334/// host future can be dropped at any time throughout the system and Wasmtime
335/// store context is not necessarily available at that time. It's recommended to
336/// not use `Accessor` methods in anything connected to a `Drop` implementation
337/// as they will panic and have unintended results. If you run into this though
338/// feel free to file an issue on the Wasmtime repository.
339pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341    D: HasData + ?Sized,
342{
343    token: StoreToken<T>,
344    get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347/// A helper trait to take any type of accessor-with-data in functions.
348///
349/// This trait is similar to [`AsContextMut`] except that it's used when
350/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351/// [`Accessor`] is the main type used in concurrent settings and is passed to
352/// functions such as [`Func::call_concurrent`].
353///
354/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355/// this trait. This effectively means that regardless of the `D` in
356/// `Accessor<T, D>` it can still be passed to a function which just needs a
357/// store accessor.
358///
359/// Acquiring an [`Accessor`] can be done through
360/// [`StoreContextMut::run_concurrent`] for example or in a host function
361/// through
362/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
363pub trait AsAccessor {
364    /// The `T` in `Store<T>` that this accessor refers to.
365    type Data: 'static;
366
367    /// The `D` in `Accessor<T, D>`, or the projection out of
368    /// `Self::Data`.
369    type AccessorData: HasData + ?Sized;
370
371    /// Returns the accessor that this is referring to.
372    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376    type Data = T::Data;
377    type AccessorData = T::AccessorData;
378
379    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380        T::as_accessor(self)
381    }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385    type Data = T;
386    type AccessorData = D;
387
388    fn as_accessor(&self) -> &Accessor<T, D> {
389        self
390    }
391}
392
393// Note that it is intentional at this time that `Accessor` does not actually
394// store `&mut T` or anything similar. This distinctly enables the `Accessor`
395// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396// that matter). This is used to ergonomically simplify bindings where the
397// majority of the time `Accessor` is closed over in a future which then needs
398// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399// you already have to write `T: 'static`...) it helps to avoid this.
400//
401// Note as well that `Accessor` doesn't actually store its data at all. Instead
402// it's more of a "proof" of what can be accessed from TLS. API design around
403// `Accessor` and functions like `Linker::func_wrap_concurrent` are
404// intentionally made to ensure that `Accessor` is ideally only used in the
405// context that TLS variables are actually set. For example host functions are
406// given `&Accessor`, not `Accessor`, and this prevents them from persisting
407// the value outside of a future. Within the future the TLS variables are all
408// guaranteed to be set while the future is being polled.
409//
410// Finally though this is not an ironclad guarantee, but nor does it need to be.
411// The TLS APIs are designed to panic or otherwise model usage where they're
412// called recursively or similar. It's hoped that code cannot be constructed to
413// actually hit this at runtime but this is not a safety requirement at this
414// time.
415const _: () = {
416    const fn assert<T: Send + Sync>() {}
417    assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421    /// Creates a new `Accessor` backed by the specified functions.
422    ///
423    /// - `get`: used to retrieve the store
424    ///
425    /// - `get_data`: used to "project" from the store's associated data to
426    /// another type (e.g. a field of that data or a wrapper around it).
427    ///
428    /// - `spawn`: used to queue spawned background tasks to be run later
429    pub(crate) fn new(token: StoreToken<T>) -> Self {
430        Self {
431            token,
432            get_data: |x| x,
433        }
434    }
435}
436
437impl<T, D> Accessor<T, D>
438where
439    D: HasData + ?Sized,
440{
441    /// Run the specified closure, passing it mutable access to the store.
442    ///
443    /// This function is one of the main building blocks of the [`Accessor`]
444    /// type. This yields synchronous, blocking, access to the store via an
445    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446    /// providing the ability to access `D` via [`Access::get`]. Note that the
447    /// `fun` here is given only temporary access to the store and `T`/`D`
448    /// meaning that the return value `R` here is not allowed to capture borrows
449    /// into the two. If access is needed to data within `T` or `D` outside of
450    /// this closure then it must be `clone`d out, for example.
451    ///
452    /// # Panics
453    ///
454    /// This function will panic if it is call recursively with any other
455    /// accessor already in scope. For example if `with` is called within `fun`,
456    /// then this function will panic. It is up to the embedder to ensure that
457    /// this does not happen.
458    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459        tls::get(|vmstore| {
460            fun(Access {
461                store: self.token.as_context_mut(vmstore),
462                get_data: self.get_data,
463            })
464        })
465    }
466
467    /// Returns the getter this accessor is using to project from `T` into
468    /// `D::Data`.
469    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470        self.get_data
471    }
472
473    /// Changes this accessor to access `D2` instead of the current type
474    /// parameter `D`.
475    ///
476    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477    ///
478    /// # Panics
479    ///
480    /// When using this API the returned value is disconnected from `&self` and
481    /// the lifetime binding the `self` argument. An `Accessor` only works
482    /// within the context of the closure or async closure that it was
483    /// originally given to, however. This means that due to the fact that the
484    /// returned value has no lifetime connection it's possible to use the
485    /// accessor outside of `&self`, the original accessor, and panic.
486    ///
487    /// The returned value should only be used within the scope of the original
488    /// `Accessor` that `self` refers to.
489    pub fn with_getter<D2: HasData>(
490        &self,
491        get_data: fn(&mut T) -> D2::Data<'_>,
492    ) -> Accessor<T, D2> {
493        Accessor {
494            token: self.token,
495            get_data,
496        }
497    }
498
499    /// Spawn a background task which will receive an `&Accessor<T, D>` and
500    /// run concurrently with any other tasks in progress for the current
501    /// store.
502    ///
503    /// This is particularly useful for host functions which return a `stream`
504    /// or `future` such that the code to write to the write end of that
505    /// `stream` or `future` must run after the function returns.
506    ///
507    /// The returned [`JoinHandle`] may be used to cancel the task.
508    ///
509    /// # Panics
510    ///
511    /// Panics if called within a closure provided to the [`Accessor::with`]
512    /// function. This can only be called outside an active invocation of
513    /// [`Accessor::with`].
514    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515    where
516        T: 'static,
517    {
518        let accessor = self.clone_for_spawn();
519        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520    }
521
522    fn clone_for_spawn(&self) -> Self {
523        Self {
524            token: self.token,
525            get_data: self.get_data,
526        }
527    }
528}
529
530/// Represents a task which may be provided to `Accessor::spawn`,
531/// `Accessor::forward`, or `StorecContextMut::spawn`.
532// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
533// option.
534//
535// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
536// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
537// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
538// Send + Sync + 'static` fails with a type mismatch error when we try to pass
539// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
540// best we can do for the time being.
541pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543    D: HasData + ?Sized,
544{
545    /// Run the task.
546    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549/// Represents parameter and result metadata for the caller side of a
550/// guest->guest call orchestrated by a fused adapter.
551enum CallerInfo {
552    /// Metadata for a call to an async-lowered import
553    Async {
554        params: Vec<ValRaw>,
555        has_result: bool,
556    },
557    /// Metadata for a call to an sync-lowered import
558    Sync {
559        params: Vec<ValRaw>,
560        result_count: u32,
561    },
562}
563
564/// Indicates how a guest task is waiting on a waitable set.
565enum WaitMode {
566    /// The guest task is waiting using `task.wait`
567    Fiber(StoreFiber<'static>),
568    /// The guest task is waiting via a callback declared as part of an
569    /// async-lifted export.
570    Callback(Instance),
571}
572
573/// Represents the reason a fiber is suspending itself.
574#[derive(Debug)]
575enum SuspendReason {
576    /// The fiber is waiting for an event to be delivered to the specified
577    /// waitable set or task.
578    Waiting {
579        set: TableId<WaitableSet>,
580        thread: QualifiedThreadId,
581        skip_may_block_check: bool,
582    },
583    /// The fiber has finished handling its most recent work item and is waiting
584    /// for another (or to be dropped if it is no longer needed).
585    NeedWork,
586    /// The fiber is yielding and should be resumed once other tasks have had a
587    /// chance to run.
588    Yielding {
589        thread: QualifiedThreadId,
590        skip_may_block_check: bool,
591    },
592    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
593    ExplicitlySuspending {
594        thread: QualifiedThreadId,
595        skip_may_block_check: bool,
596    },
597}
598
599/// Represents a pending call into guest code for a given guest task.
600enum GuestCallKind {
601    /// Indicates there's an event to deliver to the task, possibly related to a
602    /// waitable set the task has been waiting on or polling.
603    DeliverEvent {
604        /// The instance to which the task belongs.
605        instance: Instance,
606        /// The waitable set the event belongs to, if any.
607        ///
608        /// If this is `None` the event will be waiting in the
609        /// `GuestTask::event` field for the task.
610        set: Option<TableId<WaitableSet>>,
611    },
612    /// Indicates that a new guest task call is pending and may be executed
613    /// using the specified closure.
614    ///
615    /// If the closure returns `Ok(Some(call))`, the `call` should be run
616    /// immediately using `handle_guest_call`.
617    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
618    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
619}
620
621impl fmt::Debug for GuestCallKind {
622    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623        match self {
624            Self::DeliverEvent { instance, set } => f
625                .debug_struct("DeliverEvent")
626                .field("instance", instance)
627                .field("set", set)
628                .finish(),
629            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
630            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
631        }
632    }
633}
634
635/// Represents a pending call into guest code for a given guest thread.
636#[derive(Debug)]
637struct GuestCall {
638    thread: QualifiedThreadId,
639    kind: GuestCallKind,
640}
641
642impl GuestCall {
643    /// Returns whether or not the call is ready to run.
644    ///
645    /// A call will not be ready to run if either:
646    ///
647    /// - the (sub-)component instance to be called has already been entered and
648    /// cannot be reentered until an in-progress call completes
649    ///
650    /// - the call is for a not-yet started task and the (sub-)component
651    /// instance to be called has backpressure enabled
652    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
653        let instance = store
654            .concurrent_state_mut()
655            .get_mut(self.thread.task)?
656            .instance;
657        let state = store.instance_state(instance).concurrent_state();
658
659        let ready = match &self.kind {
660            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
661            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
662            GuestCallKind::StartExplicit(_) => true,
663        };
664        log::trace!(
665            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
666            state.do_not_enter,
667            state.backpressure
668        );
669        Ok(ready)
670    }
671}
672
673/// Job to be run on a worker fiber.
674enum WorkerItem {
675    GuestCall(GuestCall),
676    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
677}
678
679/// Represents a pending work item to be handled by the event loop for a given
680/// component instance.
681enum WorkItem {
682    /// A host task to be pushed to `ConcurrentState::futures`.
683    PushFuture(AlwaysMut<HostTaskFuture>),
684    /// A fiber to resume.
685    ResumeFiber(StoreFiber<'static>),
686    /// A pending call into guest code for a given guest task.
687    GuestCall(GuestCall),
688    /// A job to run on a worker fiber.
689    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
690}
691
692impl fmt::Debug for WorkItem {
693    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
694        match self {
695            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
696            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
697            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
698            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
699        }
700    }
701}
702
703/// Whether a suspension intrinsic was cancelled or completed
704#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
705pub(crate) enum WaitResult {
706    Cancelled,
707    Completed,
708}
709
710/// Poll the specified future until it completes on behalf of a guest->host call
711/// using a sync-lowered import.
712///
713/// This is similar to `Instance::first_poll` except it's for sync-lowered
714/// imports, meaning we don't need to handle cancellation and we can block the
715/// caller until the task completes, at which point the caller can handle
716/// lowering the result to the guest's stack and linear memory.
717pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
718    store: &mut dyn VMStore,
719    future: impl Future<Output = Result<R>> + Send + 'static,
720) -> Result<R> {
721    let state = store.concurrent_state_mut();
722    let task = state.unwrap_current_host_thread();
723
724    // Wrap the future in a closure which will take care of stashing the result
725    // in `GuestTask::result` and resuming this fiber when the host task
726    // completes.
727    let mut future = Box::pin(async move {
728        let result = future.await?;
729        tls::get(move |store| {
730            let state = store.concurrent_state_mut();
731            state.get_mut(task)?.result = Some(Box::new(result) as _);
732
733            Waitable::Host(task).set_event(
734                state,
735                Some(Event::Subtask {
736                    status: Status::Returned,
737                }),
738            )?;
739
740            Ok(())
741        })
742    }) as HostTaskFuture;
743
744    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
745    // add the future to `ConcurrentState::futures` and poll it automatically
746    // from the event loop if it doesn't complete immediately here.
747    let poll = tls::set(store, || {
748        future
749            .as_mut()
750            .poll(&mut Context::from_waker(&Waker::noop()))
751    });
752
753    match poll {
754        // It completed immediately; check the result and delete the task.
755        Poll::Ready(result) => result?,
756
757        // It did not complete immediately; add it to
758        // `ConcurrentState::futures` so it will be polled via the event loop;
759        // then use `GuestTask::sync_call_set` to wait for the task to
760        // complete, suspending the current fiber until it does so.
761        Poll::Pending => {
762            let state = store.concurrent_state_mut();
763            state.push_future(future);
764
765            let caller = state.get_mut(task)?.caller;
766            let set = state.get_mut(caller.task)?.sync_call_set;
767            Waitable::Host(task).join(state, Some(set))?;
768
769            store.suspend(SuspendReason::Waiting {
770                set,
771                thread: caller,
772                skip_may_block_check: false,
773            })?;
774
775            // Remove the `task` from the `sync_call_set` to ensure that when
776            // this function returns and the task is deleted that there are no
777            // more lingering references to this host task.
778            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
779        }
780    }
781
782    // Retrieve and return the result.
783    Ok(*store
784        .concurrent_state_mut()
785        .get_mut(task)?
786        .result
787        .take()
788        .unwrap()
789        .downcast()
790        .unwrap())
791}
792
793/// Execute the specified guest call.
794fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
795    let mut next = Some(call);
796    while let Some(call) = next.take() {
797        match call.kind {
798            GuestCallKind::DeliverEvent { instance, set } => {
799                let (event, waitable) = instance
800                    .get_event(store, call.thread.task, set, true)?
801                    .unwrap();
802                let state = store.concurrent_state_mut();
803                let task = state.get_mut(call.thread.task)?;
804                let runtime_instance = task.instance;
805                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
806
807                log::trace!(
808                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
809                    call.thread,
810                );
811
812                let old_thread = store.set_thread(call.thread);
813                log::trace!(
814                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
815                    call.thread
816                );
817
818                store.enter_instance(runtime_instance);
819
820                let callback = store
821                    .concurrent_state_mut()
822                    .get_mut(call.thread.task)?
823                    .callback
824                    .take()
825                    .unwrap();
826
827                let code = callback(store, event, handle)?;
828
829                store
830                    .concurrent_state_mut()
831                    .get_mut(call.thread.task)?
832                    .callback = Some(callback);
833
834                store.exit_instance(runtime_instance)?;
835
836                store.set_thread(old_thread);
837
838                next = instance.handle_callback_code(
839                    store,
840                    call.thread,
841                    runtime_instance.index,
842                    code,
843                )?;
844
845                log::trace!(
846                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
847                );
848            }
849            GuestCallKind::StartImplicit(fun) => {
850                next = fun(store)?;
851            }
852            GuestCallKind::StartExplicit(fun) => {
853                fun(store)?;
854            }
855        }
856    }
857
858    Ok(())
859}
860
861impl<T> Store<T> {
862    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
863    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
864    where
865        T: Send + 'static,
866    {
867        ensure!(
868            self.as_context().0.concurrency_support(),
869            "cannot use `run_concurrent` when Config::concurrency_support disabled",
870        );
871        self.as_context_mut().run_concurrent(fun).await
872    }
873
874    #[doc(hidden)]
875    pub fn assert_concurrent_state_empty(&mut self) {
876        self.as_context_mut().assert_concurrent_state_empty();
877    }
878
879    #[doc(hidden)]
880    pub fn concurrent_state_table_size(&mut self) -> usize {
881        self.as_context_mut().concurrent_state_table_size()
882    }
883
884    /// Convenience wrapper for [`StoreContextMut::spawn`].
885    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
886    where
887        T: 'static,
888    {
889        self.as_context_mut().spawn(task)
890    }
891}
892
893impl<T> StoreContextMut<'_, T> {
894    /// Assert that all the relevant tables and queues in the concurrent state
895    /// for this store are empty.
896    ///
897    /// This is for sanity checking in integration tests
898    /// (e.g. `component-async-tests`) that the relevant state has been cleared
899    /// after each test concludes.  This should help us catch leaks, e.g. guest
900    /// tasks which haven't been deleted despite having completed and having
901    /// been dropped by their supertasks.
902    ///
903    /// Only intended for use in Wasmtime's own testing.
904    #[doc(hidden)]
905    pub fn assert_concurrent_state_empty(self) {
906        let store = self.0;
907        store
908            .store_data_mut()
909            .components
910            .assert_instance_states_empty();
911        let state = store.concurrent_state_mut();
912        assert!(
913            state.table.get_mut().is_empty(),
914            "non-empty table: {:?}",
915            state.table.get_mut()
916        );
917        assert!(state.high_priority.is_empty());
918        assert!(state.low_priority.is_empty());
919        assert!(state.current_thread.is_none());
920        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
921        assert!(state.global_error_context_ref_counts.is_empty());
922    }
923
924    /// Helper function to perform tests over the size of the concurrent state
925    /// table which can be useful for detecting leaks.
926    ///
927    /// Only intended for use in Wasmtime's own testing.
928    #[doc(hidden)]
929    pub fn concurrent_state_table_size(&mut self) -> usize {
930        self.0
931            .concurrent_state_mut()
932            .table
933            .get_mut()
934            .iter_mut()
935            .count()
936    }
937
938    /// Spawn a background task to run as part of this instance's event loop.
939    ///
940    /// The task will receive an `&Accessor<U>` and run concurrently with
941    /// any other tasks in progress for the instance.
942    ///
943    /// Note that the task will only make progress if and when the event loop
944    /// for this instance is run.
945    ///
946    /// The returned [`JoinHandle`] may be used to cancel the task.
947    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
948    where
949        T: 'static,
950    {
951        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
952        self.spawn_with_accessor(accessor, task)
953    }
954
955    /// Internal implementation of `spawn` functions where a `store` is
956    /// available along with an `Accessor`.
957    fn spawn_with_accessor<D>(
958        self,
959        accessor: Accessor<T, D>,
960        task: impl AccessorTask<T, D>,
961    ) -> JoinHandle
962    where
963        T: 'static,
964        D: HasData + ?Sized,
965    {
966        // Create an "abortable future" here where internally the future will
967        // hook calls to poll and possibly spawn more background tasks on each
968        // iteration.
969        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
970        self.0
971            .concurrent_state_mut()
972            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
973        handle
974    }
975
976    /// Run the specified closure `fun` to completion as part of this store's
977    /// event loop.
978    ///
979    /// This will run `fun` as part of this store's event loop until it
980    /// yields a result.  `fun` is provided an [`Accessor`], which provides
981    /// controlled access to the store and its data.
982    ///
983    /// This function can be used to invoke [`Func::call_concurrent`] for
984    /// example within the async closure provided here.
985    ///
986    /// This function will unconditionally return an error if
987    /// [`Config::concurrency_support`] is disabled.
988    ///
989    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
990    ///
991    /// # Store-blocking behavior
992    ///
993    /// At this time there are certain situations in which the `Future` returned
994    /// by the `AsyncFnOnce` passed to this function will not be polled for an
995    /// extended period of time, despite one or more `Waker::wake` events having
996    /// occurred for the task to which it belongs.  This can manifest as the
997    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
998    /// the `Store` being held by e.g. a blocking host function, preventing the
999    /// `Future` from being polled. A canonical example of this is when the
1000    /// `fun` provided to this function attempts to set a timeout for an
1001    /// invocation of a wasm function. In this situation the async closure is
1002    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1003    /// to elapse. At this time this setup will not always work and the timeout
1004    /// may not reliably fire.
1005    ///
1006    /// This function will not block the current thread and as such is always
1007    /// suitable to run in an `async` context, but the current implementation of
1008    /// Wasmtime can lead to situations where a certain wasm computation is
1009    /// required to make progress the closure to make progress. This is an
1010    /// artifact of Wasmtime's historical implementation of `async` functions
1011    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1012    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1013    /// progress for a readiness notification of (b) to get delivered.
1014    ///
1015    /// This effectively means that it's not possible to reliably perform a
1016    /// "select" operation within the `fun` closure, which timeouts for example
1017    /// are based on. Fixing this requires some relatively major refactoring
1018    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1019    /// that is intended to be fixed one day. In the meantime it's recommended
1020    /// to apply timeouts or such to the entire `run_concurrent` call itself
1021    /// rather than internally.
1022    ///
1023    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1024    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1025    ///
1026    /// # Example
1027    ///
1028    /// ```
1029    /// # use {
1030    /// #   wasmtime::{
1031    /// #     error::{Result},
1032    /// #     component::{ Component, Linker, Resource, ResourceTable},
1033    /// #     Config, Engine, Store
1034    /// #   },
1035    /// # };
1036    /// #
1037    /// # struct MyResource(u32);
1038    /// # struct Ctx { table: ResourceTable }
1039    /// #
1040    /// # async fn foo() -> Result<()> {
1041    /// # let mut config = Config::new();
1042    /// # let engine = Engine::new(&config)?;
1043    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1044    /// # let mut linker = Linker::new(&engine);
1045    /// # let component = Component::new(&engine, "")?;
1046    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1047    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1048    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1049    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1050    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1051    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1052    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1053    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1054    ///    Ok(())
1055    /// }).await??;
1056    /// # Ok(())
1057    /// # }
1058    /// ```
1059    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1060    where
1061        T: Send + 'static,
1062    {
1063        ensure!(
1064            self.0.concurrency_support(),
1065            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1066        );
1067        self.do_run_concurrent(fun, false).await
1068    }
1069
1070    pub(super) async fn run_concurrent_trap_on_idle<R>(
1071        self,
1072        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1073    ) -> Result<R>
1074    where
1075        T: Send + 'static,
1076    {
1077        self.do_run_concurrent(fun, true).await
1078    }
1079
1080    async fn do_run_concurrent<R>(
1081        mut self,
1082        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1083        trap_on_idle: bool,
1084    ) -> Result<R>
1085    where
1086        T: Send + 'static,
1087    {
1088        debug_assert!(self.0.concurrency_support());
1089        check_recursive_run();
1090        let token = StoreToken::new(self.as_context_mut());
1091
1092        struct Dropper<'a, T: 'static, V> {
1093            store: StoreContextMut<'a, T>,
1094            value: ManuallyDrop<V>,
1095        }
1096
1097        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1098            fn drop(&mut self) {
1099                tls::set(self.store.0, || {
1100                    // SAFETY: Here we drop the value without moving it for the
1101                    // first and only time -- per the contract for `Drop::drop`,
1102                    // this code won't run again, and the `value` field will no
1103                    // longer be accessible.
1104                    unsafe { ManuallyDrop::drop(&mut self.value) }
1105                });
1106            }
1107        }
1108
1109        let accessor = &Accessor::new(token);
1110        let dropper = &mut Dropper {
1111            store: self,
1112            value: ManuallyDrop::new(fun(accessor)),
1113        };
1114        // SAFETY: We never move `dropper` nor its `value` field.
1115        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1116
1117        dropper
1118            .store
1119            .as_context_mut()
1120            .poll_until(future, trap_on_idle)
1121            .await
1122    }
1123
1124    /// Run this store's event loop.
1125    ///
1126    /// The returned future will resolve when the specified future completes or,
1127    /// if `trap_on_idle` is true, when the event loop can't make further
1128    /// progress.
1129    async fn poll_until<R>(
1130        mut self,
1131        mut future: Pin<&mut impl Future<Output = R>>,
1132        trap_on_idle: bool,
1133    ) -> Result<R>
1134    where
1135        T: Send + 'static,
1136    {
1137        struct Reset<'a, T: 'static> {
1138            store: StoreContextMut<'a, T>,
1139            futures: Option<FuturesUnordered<HostTaskFuture>>,
1140        }
1141
1142        impl<'a, T> Drop for Reset<'a, T> {
1143            fn drop(&mut self) {
1144                if let Some(futures) = self.futures.take() {
1145                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1146                }
1147            }
1148        }
1149
1150        loop {
1151            // Take `ConcurrentState::futures` out of the store so we can poll
1152            // it while also safely giving any of the futures inside access to
1153            // `self`.
1154            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1155            let mut reset = Reset {
1156                store: self.as_context_mut(),
1157                futures,
1158            };
1159            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1160
1161            enum PollResult<R> {
1162                Complete(R),
1163                ProcessWork(Vec<WorkItem>),
1164            }
1165            let result = future::poll_fn(|cx| {
1166                // First, poll the future we were passed as an argument and
1167                // return immediately if it's ready.
1168                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1169                    return Poll::Ready(Ok(PollResult::Complete(value)));
1170                }
1171
1172                // Next, poll `ConcurrentState::futures` (which includes any
1173                // pending host tasks and/or background tasks), returning
1174                // immediately if one of them fails.
1175                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1176                    Poll::Ready(Some(output)) => {
1177                        match output {
1178                            Err(e) => return Poll::Ready(Err(e)),
1179                            Ok(()) => {}
1180                        }
1181                        Poll::Ready(true)
1182                    }
1183                    Poll::Ready(None) => Poll::Ready(false),
1184                    Poll::Pending => Poll::Pending,
1185                };
1186
1187                // Next, collect the next batch of work items to process, if any.
1188                // This will be either all of the high-priority work items, or if
1189                // there are none, a single low-priority work item.
1190                let state = reset.store.0.concurrent_state_mut();
1191                let ready = state.collect_work_items_to_run();
1192                if !ready.is_empty() {
1193                    return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1194                }
1195
1196                // Finally, if we have nothing else to do right now, determine what to do
1197                // based on whether there are any pending futures in
1198                // `ConcurrentState::futures`.
1199                return match next {
1200                    Poll::Ready(true) => {
1201                        // In this case, one of the futures in
1202                        // `ConcurrentState::futures` completed
1203                        // successfully, so we return now and continue
1204                        // the outer loop in case there is another one
1205                        // ready to complete.
1206                        Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1207                    }
1208                    Poll::Ready(false) => {
1209                        // Poll the future we were passed one last time
1210                        // in case one of `ConcurrentState::futures` had
1211                        // the side effect of unblocking it.
1212                        if let Poll::Ready(value) =
1213                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1214                        {
1215                            Poll::Ready(Ok(PollResult::Complete(value)))
1216                        } else {
1217                            // In this case, there are no more pending
1218                            // futures in `ConcurrentState::futures`,
1219                            // there are no remaining work items, _and_
1220                            // the future we were passed as an argument
1221                            // still hasn't completed.
1222                            if trap_on_idle {
1223                                // `trap_on_idle` is true, so we exit
1224                                // immediately.
1225                                Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1226                            } else {
1227                                // `trap_on_idle` is false, so we assume
1228                                // that future will wake up and give us
1229                                // more work to do when it's ready to.
1230                                Poll::Pending
1231                            }
1232                        }
1233                    }
1234                    // There is at least one pending future in
1235                    // `ConcurrentState::futures` and we have nothing
1236                    // else to do but wait for now, so we return
1237                    // `Pending`.
1238                    Poll::Pending => Poll::Pending,
1239                };
1240            })
1241            .await;
1242
1243            // Put the `ConcurrentState::futures` back into the store before we
1244            // return or handle any work items since one or more of those items
1245            // might append more futures.
1246            drop(reset);
1247
1248            match result? {
1249                // The future we were passed as an argument completed, so we
1250                // return the result.
1251                PollResult::Complete(value) => break Ok(value),
1252                // The future we were passed has not yet completed, so handle
1253                // any work items and then loop again.
1254                PollResult::ProcessWork(ready) => {
1255                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1256                        store: StoreContextMut<'a, T>,
1257                        ready: I,
1258                    }
1259
1260                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1261                        fn drop(&mut self) {
1262                            while let Some(item) = self.ready.next() {
1263                                match item {
1264                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1265                                    WorkItem::PushFuture(future) => {
1266                                        tls::set(self.store.0, move || drop(future))
1267                                    }
1268                                    _ => {}
1269                                }
1270                            }
1271                        }
1272                    }
1273
1274                    let mut dispose = Dispose {
1275                        store: self.as_context_mut(),
1276                        ready: ready.into_iter(),
1277                    };
1278
1279                    while let Some(item) = dispose.ready.next() {
1280                        dispose
1281                            .store
1282                            .as_context_mut()
1283                            .handle_work_item(item)
1284                            .await?;
1285                    }
1286                }
1287            }
1288        }
1289    }
1290
1291    /// Handle the specified work item, possibly resuming a fiber if applicable.
1292    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1293    where
1294        T: Send,
1295    {
1296        log::trace!("handle work item {item:?}");
1297        match item {
1298            WorkItem::PushFuture(future) => {
1299                self.0
1300                    .concurrent_state_mut()
1301                    .futures
1302                    .get_mut()
1303                    .as_mut()
1304                    .unwrap()
1305                    .push(future.into_inner());
1306            }
1307            WorkItem::ResumeFiber(fiber) => {
1308                self.0.resume_fiber(fiber).await?;
1309            }
1310            WorkItem::GuestCall(call) => {
1311                if call.is_ready(self.0)? {
1312                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1313                } else {
1314                    let state = self.0.concurrent_state_mut();
1315                    let task = state.get_mut(call.thread.task)?;
1316                    if !task.starting_sent {
1317                        task.starting_sent = true;
1318                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1319                            Waitable::Guest(call.thread.task).set_event(
1320                                state,
1321                                Some(Event::Subtask {
1322                                    status: Status::Starting,
1323                                }),
1324                            )?;
1325                        }
1326                    }
1327
1328                    let instance = state.get_mut(call.thread.task)?.instance;
1329                    self.0
1330                        .instance_state(instance)
1331                        .concurrent_state()
1332                        .pending
1333                        .insert(call.thread, call.kind);
1334                }
1335            }
1336            WorkItem::WorkerFunction(fun) => {
1337                self.run_on_worker(WorkerItem::Function(fun)).await?;
1338            }
1339        }
1340
1341        Ok(())
1342    }
1343
1344    /// Execute the specified guest call on a worker fiber.
1345    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1346    where
1347        T: Send,
1348    {
1349        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1350            fiber
1351        } else {
1352            fiber::make_fiber(self.0, move |store| {
1353                loop {
1354                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1355                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1356                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1357                    }
1358
1359                    store.suspend(SuspendReason::NeedWork)?;
1360                }
1361            })?
1362        };
1363
1364        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1365        assert!(worker_item.is_none());
1366        *worker_item = Some(item);
1367
1368        self.0.resume_fiber(worker).await
1369    }
1370
1371    /// Wrap the specified host function in a future which will call it, passing
1372    /// it an `&Accessor<T>`.
1373    ///
1374    /// See the `Accessor` documentation for details.
1375    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1376    where
1377        T: 'static,
1378        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1379            + Send
1380            + Sync
1381            + 'static,
1382        R: Send + Sync + 'static,
1383    {
1384        let token = StoreToken::new(self);
1385        async move {
1386            let mut accessor = Accessor::new(token);
1387            closure(&mut accessor).await
1388        }
1389    }
1390}
1391
1392impl StoreOpaque {
1393    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1394    /// guest-to-guest call or a sync host-to-guest call.
1395    ///
1396    /// This task will only be used for the purpose of handling calls to
1397    /// intrinsic functions; both parameter lowering and result lifting are
1398    /// assumed to be taken care of elsewhere.
1399    pub(crate) fn enter_guest_sync_call(
1400        &mut self,
1401        guest_caller: Option<RuntimeInstance>,
1402        callee_async: bool,
1403        callee: RuntimeInstance,
1404    ) -> Result<()> {
1405        log::trace!("enter sync call {callee:?}");
1406        if !self.concurrency_support() {
1407            return Ok(self.enter_call_not_concurrent());
1408        }
1409
1410        let state = self.concurrent_state_mut();
1411        let thread = state.current_thread;
1412        let instance = if let Some(thread) = thread.guest() {
1413            Some(state.get_mut(thread.task)?.instance)
1414        } else {
1415            None
1416        };
1417        let task = GuestTask::new(
1418            state,
1419            Box::new(move |_, _| unreachable!()),
1420            LiftResult {
1421                lift: Box::new(move |_, _| unreachable!()),
1422                ty: TypeTupleIndex::reserved_value(),
1423                memory: None,
1424                string_encoding: StringEncoding::Utf8,
1425            },
1426            if let Some(caller) = guest_caller {
1427                assert_eq!(caller, instance.unwrap());
1428                Caller::Guest {
1429                    thread: *thread.guest().unwrap(),
1430                }
1431            } else {
1432                Caller::Host {
1433                    tx: None,
1434                    exit_tx: Arc::new(oneshot::channel().0),
1435                    host_future_present: false,
1436                    caller: thread,
1437                }
1438            },
1439            None,
1440            callee,
1441            callee_async,
1442        )?;
1443
1444        let guest_task = state.push(task)?;
1445        let new_thread = GuestThread::new_implicit(guest_task);
1446        let guest_thread = state.push(new_thread)?;
1447        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1448            guest_thread,
1449            self,
1450            callee.index,
1451        )?;
1452
1453        let state = self.concurrent_state_mut();
1454        state.get_mut(guest_task)?.threads.insert(guest_thread);
1455        if guest_caller.is_some() {
1456            let thread = thread.guest().unwrap();
1457            state.get_mut(thread.task)?.subtasks.insert(guest_task);
1458        }
1459
1460        self.set_thread(QualifiedThreadId {
1461            task: guest_task,
1462            thread: guest_thread,
1463        });
1464
1465        Ok(())
1466    }
1467
1468    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1469    pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1470        if !self.concurrency_support() {
1471            return Ok(self.exit_call_not_concurrent());
1472        }
1473        let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1474        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1475        log::trace!("exit sync call {instance:?}");
1476        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1477            self,
1478            thread,
1479            instance.index,
1480        )?;
1481
1482        let state = self.concurrent_state_mut();
1483        let task = state.get_mut(thread.task)?;
1484        let caller = match &task.caller {
1485            &Caller::Guest { thread } => {
1486                assert!(guest_caller);
1487                thread.into()
1488            }
1489            &Caller::Host { caller, .. } => {
1490                assert!(!guest_caller);
1491                caller
1492            }
1493        };
1494        self.set_thread(caller);
1495
1496        let state = self.concurrent_state_mut();
1497        let task = state.get_mut(thread.task)?;
1498        if task.ready_to_delete() {
1499            state.delete(thread.task)?.dispose(state, thread.task)?;
1500        }
1501
1502        Ok(())
1503    }
1504
1505    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1506    /// transition to the host.
1507    ///
1508    /// FIXME: this is called for all guest->host transitions and performs some
1509    /// relatively expensive table manipulations. This would ideally be
1510    /// optimized to avoid the full allocation of a `HostTask` in at least some
1511    /// situations.
1512    pub fn enter_host_call(&mut self) -> Result<()> {
1513        let state = self.concurrent_state_mut();
1514        let caller = state.unwrap_current_guest_thread();
1515        let task = state.push(HostTask::new(caller))?;
1516        log::trace!("new host task {task:?}");
1517        self.set_thread(task);
1518        Ok(())
1519    }
1520
1521    /// Dual of `enter_host_call` and signifies that the host has finished and
1522    /// will be cleaned up.
1523    ///
1524    /// Note that this isn't invoked when the host is invoked asynchronously and
1525    /// the host isn't complete yet. In that situation the host task persists
1526    /// and will be cleaned up separately.
1527    pub fn exit_host_call(&mut self) -> Result<()> {
1528        let task = self.concurrent_state_mut().unwrap_current_host_thread();
1529        log::trace!("delete host task {task:?}");
1530        let task = self.concurrent_state_mut().delete(task)?;
1531        self.set_thread(task.caller);
1532        Ok(())
1533    }
1534
1535    /// Determine whether the specified instance may be entered from the host.
1536    ///
1537    /// We return `true` here only if all of the following hold:
1538    ///
1539    /// - The top-level instance is not already on the current task's call stack.
1540    /// - The instance is not in need of a post-return function call.
1541    /// - `self` has not been poisoned due to a trap.
1542    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1543        if self.trapped() {
1544            return false;
1545        }
1546        if !self.concurrency_support() {
1547            return true;
1548        }
1549        let state = self.concurrent_state_mut();
1550        let mut cur = state.current_thread;
1551        loop {
1552            match cur {
1553                CurrentThread::None => break true,
1554                CurrentThread::Guest(thread) => {
1555                    let task = state.get_mut(thread.task).unwrap();
1556
1557                    // Note that we only compare top-level instance IDs here.
1558                    // The idea is that the host is not allowed to recursively
1559                    // enter a top-level instance even if the specific leaf
1560                    // instance is not on the stack. This the behavior defined
1561                    // in the spec, and it allows us to elide runtime checks in
1562                    // guest-to-guest adapters.
1563                    if task.instance.instance == instance.instance {
1564                        break false;
1565                    }
1566                    cur = match task.caller {
1567                        Caller::Host { caller, .. } => caller,
1568                        Caller::Guest { thread } => thread.into(),
1569                    };
1570                }
1571                CurrentThread::Host(id) => {
1572                    cur = state.get_mut(id).unwrap().caller.into();
1573                }
1574            }
1575        }
1576    }
1577
1578    /// Helper function to retrieve the `InstanceState` for the
1579    /// specified instance.
1580    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1581        self.component_instance_mut(instance.instance)
1582            .instance_state(instance.index)
1583    }
1584
1585    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1586        // Each time we switch threads, we conservatively set `task_may_block`
1587        // to `false` for the component instance we're switching away from (if
1588        // any), meaning it will be `false` for any new thread created for that
1589        // instance unless explicitly set otherwise.
1590        let state = self.concurrent_state_mut();
1591        let old_thread = mem::replace(&mut state.current_thread, thread.into());
1592        if let Some(old_thread) = old_thread.guest() {
1593            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1594            self.component_instance_mut(instance)
1595                .set_task_may_block(false)
1596        }
1597
1598        // If we're switching to a new thread, set its component instance's
1599        // `task_may_block` according to where it left off.
1600        if self.concurrent_state_mut().current_thread.guest().is_some() {
1601            self.set_task_may_block();
1602        }
1603
1604        old_thread
1605    }
1606
1607    /// Set the global variable representing whether the current task may block
1608    /// prior to entering Wasm code.
1609    fn set_task_may_block(&mut self) {
1610        let state = self.concurrent_state_mut();
1611        let guest_thread = state.unwrap_current_guest_thread();
1612        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1613        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1614        self.component_instance_mut(instance)
1615            .set_task_may_block(may_block)
1616    }
1617
1618    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1619        if !self.concurrency_support() {
1620            return Ok(());
1621        }
1622        let state = self.concurrent_state_mut();
1623        let task = state.unwrap_current_guest_thread().task;
1624        let instance = state.get_mut(task).unwrap().instance.instance;
1625        let task_may_block = self.component_instance(instance).get_task_may_block();
1626
1627        if task_may_block {
1628            Ok(())
1629        } else {
1630            Err(Trap::CannotBlockSyncTask.into())
1631        }
1632    }
1633
1634    /// Record that we're about to enter a (sub-)component instance which does
1635    /// not support more than one concurrent, stackful activation, meaning it
1636    /// cannot be entered again until the next call returns.
1637    fn enter_instance(&mut self, instance: RuntimeInstance) {
1638        log::trace!("enter {instance:?}");
1639        self.instance_state(instance)
1640            .concurrent_state()
1641            .do_not_enter = true;
1642    }
1643
1644    /// Record that we've exited a (sub-)component instance previously entered
1645    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1646    /// See the documentation for the latter for details.
1647    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1648        log::trace!("exit {instance:?}");
1649        self.instance_state(instance)
1650            .concurrent_state()
1651            .do_not_enter = false;
1652        self.partition_pending(instance)
1653    }
1654
1655    /// Iterate over `InstanceState::pending`, moving any ready items into the
1656    /// "high priority" work item queue.
1657    ///
1658    /// See `GuestCall::is_ready` for details.
1659    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1660        for (thread, kind) in
1661            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1662        {
1663            let call = GuestCall { thread, kind };
1664            if call.is_ready(self)? {
1665                self.concurrent_state_mut()
1666                    .push_high_priority(WorkItem::GuestCall(call));
1667            } else {
1668                self.instance_state(instance)
1669                    .concurrent_state()
1670                    .pending
1671                    .insert(call.thread, call.kind);
1672            }
1673        }
1674
1675        Ok(())
1676    }
1677
1678    /// Implements the `backpressure.{inc,dec}` intrinsics.
1679    pub(crate) fn backpressure_modify(
1680        &mut self,
1681        caller_instance: RuntimeInstance,
1682        modify: impl FnOnce(u16) -> Option<u16>,
1683    ) -> Result<()> {
1684        let state = self.instance_state(caller_instance).concurrent_state();
1685        let old = state.backpressure;
1686        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1687        state.backpressure = new;
1688
1689        if old > 0 && new == 0 {
1690            // Backpressure was previously enabled and is now disabled; move any
1691            // newly-eligible guest calls to the "high priority" queue.
1692            self.partition_pending(caller_instance)?;
1693        }
1694
1695        Ok(())
1696    }
1697
1698    /// Resume the specified fiber, giving it exclusive access to the specified
1699    /// store.
1700    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1701        let old_thread = self.concurrent_state_mut().current_thread;
1702        log::trace!("resume_fiber: save current thread {old_thread:?}");
1703
1704        let fiber = fiber::resolve_or_release(self, fiber).await?;
1705
1706        self.set_thread(old_thread);
1707
1708        let state = self.concurrent_state_mut();
1709
1710        if let Some(ot) = old_thread.guest() {
1711            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1712        }
1713        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1714
1715        if let Some(mut fiber) = fiber {
1716            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1717            // See the `SuspendReason` documentation for what each case means.
1718            match state.suspend_reason.take().unwrap() {
1719                SuspendReason::NeedWork => {
1720                    if state.worker.is_none() {
1721                        state.worker = Some(fiber);
1722                    } else {
1723                        fiber.dispose(self);
1724                    }
1725                }
1726                SuspendReason::Yielding { thread, .. } => {
1727                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1728                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1729                }
1730                SuspendReason::ExplicitlySuspending { thread, .. } => {
1731                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1732                }
1733                SuspendReason::Waiting { set, thread, .. } => {
1734                    let old = state
1735                        .get_mut(set)?
1736                        .waiting
1737                        .insert(thread, WaitMode::Fiber(fiber));
1738                    assert!(old.is_none());
1739                }
1740            };
1741        } else {
1742            log::trace!("resume_fiber: fiber has exited");
1743        }
1744
1745        Ok(())
1746    }
1747
1748    /// Suspend the current fiber, storing the reason in
1749    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1750    /// it should be resumed.
1751    ///
1752    /// See the `SuspendReason` documentation for details.
1753    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1754        log::trace!("suspend fiber: {reason:?}");
1755
1756        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1757        // pop the call context which manages resource borrows before suspending
1758        // and then push it again once we've resumed.
1759        let task = match &reason {
1760            SuspendReason::Yielding { thread, .. }
1761            | SuspendReason::Waiting { thread, .. }
1762            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1763            SuspendReason::NeedWork => None,
1764        };
1765
1766        let old_guest_thread = if task.is_some() {
1767            self.concurrent_state_mut().current_thread
1768        } else {
1769            CurrentThread::None
1770        };
1771
1772        // We should not have reached here unless either there's no current
1773        // task, or the current task is permitted to block.  In addition, we
1774        // special-case `thread.switch-to` and waiting for a subtask to go from
1775        // `starting` to `started`, both of which we consider non-blocking
1776        // operations despite requiring a suspend.
1777        assert!(
1778            matches!(
1779                reason,
1780                SuspendReason::ExplicitlySuspending {
1781                    skip_may_block_check: true,
1782                    ..
1783                } | SuspendReason::Waiting {
1784                    skip_may_block_check: true,
1785                    ..
1786                } | SuspendReason::Yielding {
1787                    skip_may_block_check: true,
1788                    ..
1789                }
1790            ) || old_guest_thread
1791                .guest()
1792                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1793                .unwrap_or(true)
1794        );
1795
1796        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1797        assert!(suspend_reason.is_none());
1798        *suspend_reason = Some(reason);
1799
1800        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1801
1802        if task.is_some() {
1803            self.set_thread(old_guest_thread);
1804        }
1805
1806        Ok(())
1807    }
1808
1809    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1810        let state = self.concurrent_state_mut();
1811        let caller = state.unwrap_current_guest_thread();
1812        let old_set = waitable.common(state)?.set;
1813        let set = state.get_mut(caller.task)?.sync_call_set;
1814        waitable.join(state, Some(set))?;
1815        self.suspend(SuspendReason::Waiting {
1816            set,
1817            thread: caller,
1818            skip_may_block_check: false,
1819        })?;
1820        let state = self.concurrent_state_mut();
1821        waitable.join(state, old_set)
1822    }
1823}
1824
1825impl Instance {
1826    /// Get the next pending event for the specified task and (optional)
1827    /// waitable set, along with the waitable handle if applicable.
1828    fn get_event(
1829        self,
1830        store: &mut StoreOpaque,
1831        guest_task: TableId<GuestTask>,
1832        set: Option<TableId<WaitableSet>>,
1833        cancellable: bool,
1834    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1835        let state = store.concurrent_state_mut();
1836
1837        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1838            log::trace!("deliver event {event:?} to {guest_task:?}");
1839
1840            if cancellable || !matches!(event, Event::Cancelled) {
1841                return Ok(Some((event, None)));
1842            } else {
1843                state.get_mut(guest_task)?.event = Some(event);
1844            }
1845        }
1846
1847        Ok(
1848            if let Some((set, waitable)) = set
1849                .and_then(|set| {
1850                    state
1851                        .get_mut(set)
1852                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1853                        .transpose()
1854                })
1855                .transpose()?
1856            {
1857                let common = waitable.common(state)?;
1858                let handle = common.handle.unwrap();
1859                let event = common.event.take().unwrap();
1860
1861                log::trace!(
1862                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1863                );
1864
1865                waitable.on_delivery(store, self, event);
1866
1867                Some((event, Some((waitable, handle))))
1868            } else {
1869                None
1870            },
1871        )
1872    }
1873
1874    /// Handle the `CallbackCode` returned from an async-lifted export or its
1875    /// callback.
1876    ///
1877    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1878    /// using `handle_guest_call`.
1879    fn handle_callback_code(
1880        self,
1881        store: &mut StoreOpaque,
1882        guest_thread: QualifiedThreadId,
1883        runtime_instance: RuntimeComponentInstanceIndex,
1884        code: u32,
1885    ) -> Result<Option<GuestCall>> {
1886        let (code, set) = unpack_callback_code(code);
1887
1888        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1889
1890        let state = store.concurrent_state_mut();
1891
1892        let get_set = |store: &mut StoreOpaque, handle| {
1893            if handle == 0 {
1894                bail!("invalid waitable-set handle");
1895            }
1896
1897            let set = store
1898                .instance_state(RuntimeInstance {
1899                    instance: self.id().instance(),
1900                    index: runtime_instance,
1901                })
1902                .handle_table()
1903                .waitable_set_rep(handle)?;
1904
1905            Ok(TableId::<WaitableSet>::new(set))
1906        };
1907
1908        Ok(match code {
1909            callback_code::EXIT => {
1910                log::trace!("implicit thread {guest_thread:?} completed");
1911                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1912                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1913                if task.threads.is_empty() && !task.returned_or_cancelled() {
1914                    bail!(Trap::NoAsyncResult);
1915                }
1916                match &task.caller {
1917                    Caller::Host { .. } => {
1918                        if task.ready_to_delete() {
1919                            Waitable::Guest(guest_thread.task)
1920                                .delete_from(store.concurrent_state_mut())?;
1921                        }
1922                    }
1923                    Caller::Guest { .. } => {
1924                        task.exited = true;
1925                        task.callback = None;
1926                    }
1927                }
1928                None
1929            }
1930            callback_code::YIELD => {
1931                let task = state.get_mut(guest_thread.task)?;
1932                // If an `Event::Cancelled` is pending, we'll deliver that;
1933                // otherwise, we'll deliver `Event::None`.  Note that
1934                // `GuestTask::event` is only ever set to one of those two
1935                // `Event` variants.
1936                if let Some(event) = task.event {
1937                    assert!(matches!(event, Event::None | Event::Cancelled));
1938                } else {
1939                    task.event = Some(Event::None);
1940                }
1941                let call = GuestCall {
1942                    thread: guest_thread,
1943                    kind: GuestCallKind::DeliverEvent {
1944                        instance: self,
1945                        set: None,
1946                    },
1947                };
1948                if state.may_block(guest_thread.task) {
1949                    // Push this thread onto the "low priority" queue so it runs
1950                    // after any other threads have had a chance to run.
1951                    state.push_low_priority(WorkItem::GuestCall(call));
1952                    None
1953                } else {
1954                    // Yielding in a non-blocking context is defined as a no-op
1955                    // according to the spec, so we must run this thread
1956                    // immediately without allowing any others to run.
1957                    Some(call)
1958                }
1959            }
1960            callback_code::WAIT => {
1961                // The task may only return `WAIT` if it was created for a call
1962                // to an async export).  Otherwise, we'll trap.
1963                state.check_blocking_for(guest_thread.task)?;
1964
1965                let set = get_set(store, set)?;
1966                let state = store.concurrent_state_mut();
1967
1968                if state.get_mut(guest_thread.task)?.event.is_some()
1969                    || !state.get_mut(set)?.ready.is_empty()
1970                {
1971                    // An event is immediately available; deliver it ASAP.
1972                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1973                        thread: guest_thread,
1974                        kind: GuestCallKind::DeliverEvent {
1975                            instance: self,
1976                            set: Some(set),
1977                        },
1978                    }));
1979                } else {
1980                    // No event is immediately available.
1981                    //
1982                    // We're waiting, so register to be woken up when an event
1983                    // is published for this waitable set.
1984                    //
1985                    // Here we also set `GuestTask::wake_on_cancel` which allows
1986                    // `subtask.cancel` to interrupt the wait.
1987                    let old = state
1988                        .get_mut(guest_thread.thread)?
1989                        .wake_on_cancel
1990                        .replace(set);
1991                    assert!(old.is_none());
1992                    let old = state
1993                        .get_mut(set)?
1994                        .waiting
1995                        .insert(guest_thread, WaitMode::Callback(self));
1996                    assert!(old.is_none());
1997                }
1998                None
1999            }
2000            _ => bail!("unsupported callback code: {code}"),
2001        })
2002    }
2003
2004    fn cleanup_thread(
2005        self,
2006        store: &mut StoreOpaque,
2007        guest_thread: QualifiedThreadId,
2008        runtime_instance: RuntimeComponentInstanceIndex,
2009    ) -> Result<()> {
2010        let guest_id = store
2011            .concurrent_state_mut()
2012            .get_mut(guest_thread.thread)?
2013            .instance_rep;
2014        store
2015            .instance_state(RuntimeInstance {
2016                instance: self.id().instance(),
2017                index: runtime_instance,
2018            })
2019            .thread_handle_table()
2020            .guest_thread_remove(guest_id.unwrap())?;
2021
2022        store.concurrent_state_mut().delete(guest_thread.thread)?;
2023        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2024        task.threads.remove(&guest_thread.thread);
2025        Ok(())
2026    }
2027
2028    /// Add the specified guest call to the "high priority" work item queue, to
2029    /// be started as soon as backpressure and/or reentrance rules allow.
2030    ///
2031    /// SAFETY: The raw pointer arguments must be valid references to guest
2032    /// functions (with the appropriate signatures) when the closures queued by
2033    /// this function are called.
2034    unsafe fn queue_call<T: 'static>(
2035        self,
2036        mut store: StoreContextMut<T>,
2037        guest_thread: QualifiedThreadId,
2038        callee: SendSyncPtr<VMFuncRef>,
2039        param_count: usize,
2040        result_count: usize,
2041        async_: bool,
2042        callback: Option<SendSyncPtr<VMFuncRef>>,
2043        post_return: Option<SendSyncPtr<VMFuncRef>>,
2044    ) -> Result<()> {
2045        /// Return a closure which will call the specified function in the scope
2046        /// of the specified task.
2047        ///
2048        /// This will use `GuestTask::lower_params` to lower the parameters, but
2049        /// will not lift the result; instead, it returns a
2050        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2051        /// any, may be lifted.  Note that an async-lifted export will have
2052        /// returned its result using the `task.return` intrinsic (or not
2053        /// returned a result at all, in the case of `task.cancel`), in which
2054        /// case the "result" of this call will either be a callback code or
2055        /// nothing.
2056        ///
2057        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2058        /// the returned closure is called.
2059        unsafe fn make_call<T: 'static>(
2060            store: StoreContextMut<T>,
2061            guest_thread: QualifiedThreadId,
2062            callee: SendSyncPtr<VMFuncRef>,
2063            param_count: usize,
2064            result_count: usize,
2065        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2066        + Send
2067        + Sync
2068        + 'static
2069        + use<T> {
2070            let token = StoreToken::new(store);
2071            move |store: &mut dyn VMStore| {
2072                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2073
2074                store
2075                    .concurrent_state_mut()
2076                    .get_mut(guest_thread.thread)?
2077                    .state = GuestThreadState::Running;
2078                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2079                let lower = task.lower_params.take().unwrap();
2080
2081                lower(store, &mut storage[..param_count])?;
2082
2083                let mut store = token.as_context_mut(store);
2084
2085                // SAFETY: Per the contract documented in `make_call's`
2086                // documentation, `callee` must be a valid pointer.
2087                unsafe {
2088                    crate::Func::call_unchecked_raw(
2089                        &mut store,
2090                        callee.as_non_null(),
2091                        NonNull::new(
2092                            &mut storage[..param_count.max(result_count)]
2093                                as *mut [MaybeUninit<ValRaw>] as _,
2094                        )
2095                        .unwrap(),
2096                    )?;
2097                }
2098
2099                Ok(storage)
2100            }
2101        }
2102
2103        // SAFETY: Per the contract described in this function documentation,
2104        // the `callee` pointer which `call` closes over must be valid when
2105        // called by the closure we queue below.
2106        let call = unsafe {
2107            make_call(
2108                store.as_context_mut(),
2109                guest_thread,
2110                callee,
2111                param_count,
2112                result_count,
2113            )
2114        };
2115
2116        let callee_instance = store
2117            .0
2118            .concurrent_state_mut()
2119            .get_mut(guest_thread.task)?
2120            .instance;
2121
2122        let fun = if callback.is_some() {
2123            assert!(async_);
2124
2125            Box::new(move |store: &mut dyn VMStore| {
2126                self.add_guest_thread_to_instance_table(
2127                    guest_thread.thread,
2128                    store,
2129                    callee_instance.index,
2130                )?;
2131                let old_thread = store.set_thread(guest_thread);
2132                log::trace!(
2133                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2134                );
2135
2136                store.enter_instance(callee_instance);
2137
2138                // SAFETY: See the documentation for `make_call` to review the
2139                // contract we must uphold for `call` here.
2140                //
2141                // Per the contract described in the `queue_call`
2142                // documentation, the `callee` pointer which `call` closes
2143                // over must be valid.
2144                let storage = call(store)?;
2145
2146                store.exit_instance(callee_instance)?;
2147
2148                store.set_thread(old_thread);
2149                let state = store.concurrent_state_mut();
2150                old_thread
2151                    .guest()
2152                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2153                log::trace!("stackless call: restored {old_thread:?} as current thread");
2154
2155                // SAFETY: `wasmparser` will have validated that the callback
2156                // function returns a `i32` result.
2157                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2158
2159                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2160            })
2161                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2162        } else {
2163            let token = StoreToken::new(store.as_context_mut());
2164            Box::new(move |store: &mut dyn VMStore| {
2165                self.add_guest_thread_to_instance_table(
2166                    guest_thread.thread,
2167                    store,
2168                    callee_instance.index,
2169                )?;
2170                let old_thread = store.set_thread(guest_thread);
2171                log::trace!(
2172                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2173                );
2174                let flags = self.id().get(store).instance_flags(callee_instance.index);
2175
2176                // Unless this is a callback-less (i.e. stackful)
2177                // async-lifted export, we need to record that the instance
2178                // cannot be entered until the call returns.
2179                if !async_ {
2180                    store.enter_instance(callee_instance);
2181                }
2182
2183                // SAFETY: See the documentation for `make_call` to review the
2184                // contract we must uphold for `call` here.
2185                //
2186                // Per the contract described in the `queue_call`
2187                // documentation, the `callee` pointer which `call` closes
2188                // over must be valid.
2189                let storage = call(store)?;
2190
2191                if async_ {
2192                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2193                    if task.threads.len() == 1 && !task.returned_or_cancelled() {
2194                        bail!(Trap::NoAsyncResult);
2195                    }
2196                } else {
2197                    // This is a sync-lifted export, so now is when we lift the
2198                    // result, optionally call the post-return function, if any,
2199                    // and finally notify any current or future waiters that the
2200                    // subtask has returned.
2201
2202                    let lift = {
2203                        store.exit_instance(callee_instance)?;
2204
2205                        let state = store.concurrent_state_mut();
2206                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2207
2208                        state
2209                            .get_mut(guest_thread.task)?
2210                            .lift_result
2211                            .take()
2212                            .unwrap()
2213                    };
2214
2215                    // SAFETY: `result_count` represents the number of core Wasm
2216                    // results returned, per `wasmparser`.
2217                    let result = (lift.lift)(store, unsafe {
2218                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2219                            &storage[..result_count],
2220                        )
2221                    })?;
2222
2223                    let post_return_arg = match result_count {
2224                        0 => ValRaw::i32(0),
2225                        // SAFETY: `result_count` represents the number of
2226                        // core Wasm results returned, per `wasmparser`.
2227                        1 => unsafe { storage[0].assume_init() },
2228                        _ => unreachable!(),
2229                    };
2230
2231                    unsafe {
2232                        call_post_return(
2233                            token.as_context_mut(store),
2234                            post_return.map(|v| v.as_non_null()),
2235                            post_return_arg,
2236                            flags,
2237                        )?;
2238                    }
2239
2240                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2241                }
2242
2243                // This is a callback-less call, so the implicit thread has now completed
2244                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2245
2246                store.set_thread(old_thread);
2247
2248                let state = store.concurrent_state_mut();
2249                let task = state.get_mut(guest_thread.task)?;
2250
2251                match &task.caller {
2252                    Caller::Host { .. } => {
2253                        if task.ready_to_delete() {
2254                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2255                        }
2256                    }
2257                    Caller::Guest { .. } => {
2258                        task.exited = true;
2259                    }
2260                }
2261
2262                Ok(None)
2263            })
2264        };
2265
2266        store
2267            .0
2268            .concurrent_state_mut()
2269            .push_high_priority(WorkItem::GuestCall(GuestCall {
2270                thread: guest_thread,
2271                kind: GuestCallKind::StartImplicit(fun),
2272            }));
2273
2274        Ok(())
2275    }
2276
2277    /// Prepare (but do not start) a guest->guest call.
2278    ///
2279    /// This is called from fused adapter code generated in
2280    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2281    /// are synthesized Wasm functions which move the parameters from the caller
2282    /// to the callee and the result from the callee to the caller,
2283    /// respectively.  The adapter will call `Self::start_call` immediately
2284    /// after calling this function.
2285    ///
2286    /// SAFETY: All the pointer arguments must be valid pointers to guest
2287    /// entities (and with the expected signatures for the function references
2288    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2289    unsafe fn prepare_call<T: 'static>(
2290        self,
2291        mut store: StoreContextMut<T>,
2292        start: *mut VMFuncRef,
2293        return_: *mut VMFuncRef,
2294        caller_instance: RuntimeComponentInstanceIndex,
2295        callee_instance: RuntimeComponentInstanceIndex,
2296        task_return_type: TypeTupleIndex,
2297        callee_async: bool,
2298        memory: *mut VMMemoryDefinition,
2299        string_encoding: u8,
2300        caller_info: CallerInfo,
2301    ) -> Result<()> {
2302        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2303            // A task may only call an async-typed function via a sync lower if
2304            // it was created by a call to an async export.  Otherwise, we'll
2305            // trap.
2306            store.0.check_blocking()?;
2307        }
2308
2309        enum ResultInfo {
2310            Heap { results: u32 },
2311            Stack { result_count: u32 },
2312        }
2313
2314        let result_info = match &caller_info {
2315            CallerInfo::Async {
2316                has_result: true,
2317                params,
2318            } => ResultInfo::Heap {
2319                results: params.last().unwrap().get_u32(),
2320            },
2321            CallerInfo::Async {
2322                has_result: false, ..
2323            } => ResultInfo::Stack { result_count: 0 },
2324            CallerInfo::Sync {
2325                result_count,
2326                params,
2327            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2328                results: params.last().unwrap().get_u32(),
2329            },
2330            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2331                result_count: *result_count,
2332            },
2333        };
2334
2335        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2336
2337        // Create a new guest task for the call, closing over the `start` and
2338        // `return_` functions to lift the parameters and lower the result,
2339        // respectively.
2340        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2341        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2342        let token = StoreToken::new(store.as_context_mut());
2343        let state = store.0.concurrent_state_mut();
2344        let old_thread = state.unwrap_current_guest_thread();
2345
2346        assert_eq!(
2347            state.get_mut(old_thread.task)?.instance,
2348            RuntimeInstance {
2349                instance: self.id().instance(),
2350                index: caller_instance,
2351            }
2352        );
2353
2354        let new_task = GuestTask::new(
2355            state,
2356            Box::new(move |store, dst| {
2357                let mut store = token.as_context_mut(store);
2358                assert!(dst.len() <= MAX_FLAT_PARAMS);
2359                // The `+ 1` here accounts for the return pointer, if any:
2360                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2361                let count = match caller_info {
2362                    // Async callers, if they have a result, use the last
2363                    // parameter as a return pointer so chop that off if
2364                    // relevant here.
2365                    CallerInfo::Async { params, has_result } => {
2366                        let params = &params[..params.len() - usize::from(has_result)];
2367                        for (param, src) in params.iter().zip(&mut src) {
2368                            src.write(*param);
2369                        }
2370                        params.len()
2371                    }
2372
2373                    // Sync callers forward everything directly.
2374                    CallerInfo::Sync { params, .. } => {
2375                        for (param, src) in params.iter().zip(&mut src) {
2376                            src.write(*param);
2377                        }
2378                        params.len()
2379                    }
2380                };
2381                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2382                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2383                // how it was constructed (see
2384                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2385                // for details) we know it takes count parameters and returns
2386                // `dst.len()` results.
2387                unsafe {
2388                    crate::Func::call_unchecked_raw(
2389                        &mut store,
2390                        start.as_non_null(),
2391                        NonNull::new(
2392                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2393                        )
2394                        .unwrap(),
2395                    )?;
2396                }
2397                dst.copy_from_slice(&src[..dst.len()]);
2398                let state = store.0.concurrent_state_mut();
2399                Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2400                    state,
2401                    Some(Event::Subtask {
2402                        status: Status::Started,
2403                    }),
2404                )?;
2405                Ok(())
2406            }),
2407            LiftResult {
2408                lift: Box::new(move |store, src| {
2409                    // SAFETY: See comment in closure passed as `lower_params`
2410                    // parameter above.
2411                    let mut store = token.as_context_mut(store);
2412                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2413                    if let ResultInfo::Heap { results } = &result_info {
2414                        my_src.push(ValRaw::u32(*results));
2415                    }
2416                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2417                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2418                    // on how it was constructed (see
2419                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2420                    // for details) we know it takes `src.len()` parameters and
2421                    // returns up to 1 result.
2422                    unsafe {
2423                        crate::Func::call_unchecked_raw(
2424                            &mut store,
2425                            return_.as_non_null(),
2426                            my_src.as_mut_slice().into(),
2427                        )?;
2428                    }
2429                    let state = store.0.concurrent_state_mut();
2430                    let thread = state.unwrap_current_guest_thread();
2431                    if sync_caller {
2432                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2433                            if let ResultInfo::Stack { result_count } = &result_info {
2434                                match result_count {
2435                                    0 => None,
2436                                    1 => Some(my_src[0]),
2437                                    _ => unreachable!(),
2438                                }
2439                            } else {
2440                                None
2441                            },
2442                        );
2443                    }
2444                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2445                }),
2446                ty: task_return_type,
2447                memory: NonNull::new(memory).map(SendSyncPtr::new),
2448                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2449            },
2450            Caller::Guest { thread: old_thread },
2451            None,
2452            RuntimeInstance {
2453                instance: self.id().instance(),
2454                index: callee_instance,
2455            },
2456            callee_async,
2457        )?;
2458
2459        let guest_task = state.push(new_task)?;
2460        let new_thread = GuestThread::new_implicit(guest_task);
2461        let guest_thread = state.push(new_thread)?;
2462        state.get_mut(guest_task)?.threads.insert(guest_thread);
2463
2464        store
2465            .0
2466            .concurrent_state_mut()
2467            .get_mut(old_thread.task)?
2468            .subtasks
2469            .insert(guest_task);
2470
2471        // Make the new thread the current one so that `Self::start_call` knows
2472        // which one to start.
2473        store.0.set_thread(QualifiedThreadId {
2474            task: guest_task,
2475            thread: guest_thread,
2476        });
2477        log::trace!(
2478            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2479        );
2480
2481        Ok(())
2482    }
2483
2484    /// Call the specified callback function for an async-lifted export.
2485    ///
2486    /// SAFETY: `function` must be a valid reference to a guest function of the
2487    /// correct signature for a callback.
2488    unsafe fn call_callback<T>(
2489        self,
2490        mut store: StoreContextMut<T>,
2491        function: SendSyncPtr<VMFuncRef>,
2492        event: Event,
2493        handle: u32,
2494    ) -> Result<u32> {
2495        let (ordinal, result) = event.parts();
2496        let params = &mut [
2497            ValRaw::u32(ordinal),
2498            ValRaw::u32(handle),
2499            ValRaw::u32(result),
2500        ];
2501        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2502        // `wasmtime-cranelift`-generated fused adapter code or
2503        // `component::Options`.  Per `wasmparser` callback signature
2504        // validation, we know it takes three parameters and returns one.
2505        unsafe {
2506            crate::Func::call_unchecked_raw(
2507                &mut store,
2508                function.as_non_null(),
2509                params.as_mut_slice().into(),
2510            )?;
2511        }
2512        Ok(params[0].get_u32())
2513    }
2514
2515    /// Start a guest->guest call previously prepared using
2516    /// `Self::prepare_call`.
2517    ///
2518    /// This is called from fused adapter code generated in
2519    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2520    /// this function immediately after calling `Self::prepare_call`.
2521    ///
2522    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2523    /// functions with the appropriate signatures for the current guest task.
2524    /// If this is a call to an async-lowered import, the actual call may be
2525    /// deferred and run after this function returns, in which case the pointer
2526    /// arguments must also be valid when the call happens.
2527    unsafe fn start_call<T: 'static>(
2528        self,
2529        mut store: StoreContextMut<T>,
2530        callback: *mut VMFuncRef,
2531        post_return: *mut VMFuncRef,
2532        callee: *mut VMFuncRef,
2533        param_count: u32,
2534        result_count: u32,
2535        flags: u32,
2536        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2537    ) -> Result<u32> {
2538        let token = StoreToken::new(store.as_context_mut());
2539        let async_caller = storage.is_none();
2540        let state = store.0.concurrent_state_mut();
2541        let guest_thread = state.unwrap_current_guest_thread();
2542        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2543        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2544        let param_count = usize::try_from(param_count).unwrap();
2545        assert!(param_count <= MAX_FLAT_PARAMS);
2546        let result_count = usize::try_from(result_count).unwrap();
2547        assert!(result_count <= MAX_FLAT_RESULTS);
2548
2549        let task = state.get_mut(guest_thread.task)?;
2550        if !callback.is_null() {
2551            // We're calling an async-lifted export with a callback, so store
2552            // the callback and related context as part of the task so we can
2553            // call it later when needed.
2554            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2555            task.callback = Some(Box::new(move |store, event, handle| {
2556                let store = token.as_context_mut(store);
2557                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2558            }));
2559        }
2560
2561        let Caller::Guest { thread: caller } = &task.caller else {
2562            // As of this writing, `start_call` is only used for guest->guest
2563            // calls.
2564            unreachable!()
2565        };
2566        let caller = *caller;
2567        let caller_instance = state.get_mut(caller.task)?.instance;
2568
2569        // Queue the call as a "high priority" work item.
2570        unsafe {
2571            self.queue_call(
2572                store.as_context_mut(),
2573                guest_thread,
2574                callee,
2575                param_count,
2576                result_count,
2577                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2578                NonNull::new(callback).map(SendSyncPtr::new),
2579                NonNull::new(post_return).map(SendSyncPtr::new),
2580            )?;
2581        }
2582
2583        let state = store.0.concurrent_state_mut();
2584
2585        // Use the caller's `GuestTask::sync_call_set` to register interest in
2586        // the subtask...
2587        let guest_waitable = Waitable::Guest(guest_thread.task);
2588        let old_set = guest_waitable.common(state)?.set;
2589        let set = state.get_mut(caller.task)?.sync_call_set;
2590        guest_waitable.join(state, Some(set))?;
2591
2592        // ... and suspend this fiber temporarily while we wait for it to start.
2593        //
2594        // Note that we _could_ call the callee directly using the current fiber
2595        // rather than suspend this one, but that would make reasoning about the
2596        // event loop more complicated and is probably only worth doing if
2597        // there's a measurable performance benefit.  In addition, it would mean
2598        // blocking the caller if the callee calls a blocking sync-lowered
2599        // import, and as of this writing the spec says we must not do that.
2600        //
2601        // Alternatively, the fused adapter code could be modified to call the
2602        // callee directly without calling a host-provided intrinsic at all (in
2603        // which case it would need to do its own, inline backpressure checks,
2604        // etc.).  Again, we'd want to see a measurable performance benefit
2605        // before committing to such an optimization.  And again, we'd need to
2606        // update the spec to allow that.
2607        let (status, waitable) = loop {
2608            store.0.suspend(SuspendReason::Waiting {
2609                set,
2610                thread: caller,
2611                // Normally, `StoreOpaque::suspend` would assert it's being
2612                // called from a context where blocking is allowed.  However, if
2613                // `async_caller` is `true`, we'll only "block" long enough for
2614                // the callee to start, i.e. we won't repeat this loop, so we
2615                // tell `suspend` it's okay even if we're not allowed to block.
2616                // Alternatively, if the callee is not an async function, then
2617                // we know it won't block anyway.
2618                skip_may_block_check: async_caller || !callee_async,
2619            })?;
2620
2621            let state = store.0.concurrent_state_mut();
2622
2623            log::trace!("taking event for {:?}", guest_thread.task);
2624            let event = guest_waitable.take_event(state)?;
2625            let Some(Event::Subtask { status }) = event else {
2626                unreachable!();
2627            };
2628
2629            log::trace!("status {status:?} for {:?}", guest_thread.task);
2630
2631            if status == Status::Returned {
2632                // It returned, so we can stop waiting.
2633                break (status, None);
2634            } else if async_caller {
2635                // It hasn't returned yet, but the caller is calling via an
2636                // async-lowered import, so we generate a handle for the task
2637                // waitable and return the status.
2638                let handle = store
2639                    .0
2640                    .instance_state(caller_instance)
2641                    .handle_table()
2642                    .subtask_insert_guest(guest_thread.task.rep())?;
2643                store
2644                    .0
2645                    .concurrent_state_mut()
2646                    .get_mut(guest_thread.task)?
2647                    .common
2648                    .handle = Some(handle);
2649                break (status, Some(handle));
2650            } else {
2651                // The callee hasn't returned yet, and the caller is calling via
2652                // a sync-lowered import, so we loop and keep waiting until the
2653                // callee returns.
2654            }
2655        };
2656
2657        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2658
2659        // Reset the current thread to point to the caller as it resumes control.
2660        store.0.set_thread(caller);
2661        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2662        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2663
2664        if let Some(storage) = storage {
2665            // The caller used a sync-lowered import to call an async-lifted
2666            // export, in which case the result, if any, has been stashed in
2667            // `GuestTask::sync_result`.
2668            let state = store.0.concurrent_state_mut();
2669            let task = state.get_mut(guest_thread.task)?;
2670            if let Some(result) = task.sync_result.take() {
2671                if let Some(result) = result {
2672                    storage[0] = MaybeUninit::new(result);
2673                }
2674
2675                if task.exited && task.ready_to_delete() {
2676                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2677                }
2678            }
2679        }
2680
2681        Ok(status.pack(waitable))
2682    }
2683
2684    /// Poll the specified future once on behalf of a guest->host call using an
2685    /// async-lowered import.
2686    ///
2687    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2688    /// `Pending`, add it to the set of futures to be polled as part of this
2689    /// instance's event loop until it completes, and then return
2690    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2691    ///
2692    /// Whether the future returns `Ready` immediately or later, the `lower`
2693    /// function will be used to lower the result, if any, into the guest caller's
2694    /// stack and linear memory unless the task has been cancelled.
2695    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2696        self,
2697        mut store: StoreContextMut<'_, T>,
2698        future: impl Future<Output = Result<R>> + Send + 'static,
2699        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2700    ) -> Result<Option<u32>> {
2701        let token = StoreToken::new(store.as_context_mut());
2702        let state = store.0.concurrent_state_mut();
2703        let task = state.unwrap_current_host_thread();
2704
2705        // Create an abortable future which hooks calls to poll and manages call
2706        // context state for the future.
2707        let (join_handle, future) = JoinHandle::run(future);
2708        {
2709            let task = state.get_mut(task)?;
2710            assert!(task.join_handle.is_none());
2711            task.join_handle = Some(join_handle);
2712        }
2713
2714        let mut future = Box::pin(future);
2715
2716        // Finally, poll the future.  We can use a dummy `Waker` here because
2717        // we'll add the future to `ConcurrentState::futures` and poll it
2718        // automatically from the event loop if it doesn't complete immediately
2719        // here.
2720        let poll = tls::set(store.0, || {
2721            future
2722                .as_mut()
2723                .poll(&mut Context::from_waker(&Waker::noop()))
2724        });
2725
2726        match poll {
2727            // It finished immediately; lower the result and delete the task.
2728            Poll::Ready(Some(result)) => {
2729                lower(store.as_context_mut(), result?)?;
2730                return Ok(None);
2731            }
2732
2733            // Shouldn't be possible since the future isn't cancelled via the
2734            // `join_handle`.
2735            Poll::Ready(None) => unreachable!(),
2736
2737            // Future isn't ready yet, so fall through.
2738            Poll::Pending => {}
2739        }
2740
2741        // It hasn't finished yet; add the future to
2742        // `ConcurrentState::futures` so it will be polled by the event
2743        // loop and allocate a waitable handle to return to the guest.
2744
2745        // Wrap the future in a closure responsible for lowering the result into
2746        // the guest's stack and memory, as well as notifying any waiters that
2747        // the task returned.
2748        let future = Box::pin(async move {
2749            let result = match future.await {
2750                Some(result) => result?,
2751                // Task was cancelled; nothing left to do.
2752                None => return Ok(()),
2753            };
2754            let on_complete = move |store: &mut dyn VMStore| {
2755                // Restore the `current_thread` to be the host so `lower` knows
2756                // how to manipulate borrows and knows which scope of borrows
2757                // to check.
2758                let mut store = token.as_context_mut(store);
2759                let state = store.0.concurrent_state_mut();
2760                assert!(state.current_thread.is_none());
2761                store.0.set_thread(task);
2762
2763                lower(store.as_context_mut(), result)?;
2764                let state = store.0.concurrent_state_mut();
2765                state.get_mut(task)?.join_handle.take();
2766                Waitable::Host(task).set_event(
2767                    state,
2768                    Some(Event::Subtask {
2769                        status: Status::Returned,
2770                    }),
2771                )?;
2772
2773                // Go back to "no current thread" at the end.
2774                store.0.set_thread(CurrentThread::None);
2775                Ok(())
2776            };
2777
2778            // Here we schedule a task to run on a worker fiber to do the
2779            // lowering since it may involve a call to the guest's realloc
2780            // function. This is necessary because calling the guest while
2781            // there are host embedder frames on the stack is unsound.
2782            tls::get(move |store| {
2783                store
2784                    .concurrent_state_mut()
2785                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2786                        on_complete,
2787                    ))));
2788                Ok(())
2789            })
2790        });
2791
2792        // Make this task visible to the guest and then record what it
2793        // was made visible as.
2794        let state = store.0.concurrent_state_mut();
2795        state.push_future(future);
2796        let caller = state.get_mut(task)?.caller;
2797        let instance = state.get_mut(caller.task)?.instance;
2798        let handle = store
2799            .0
2800            .instance_state(instance)
2801            .handle_table()
2802            .subtask_insert_host(task.rep())?;
2803        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2804        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2805
2806        // Restore the currently running thread to this host task's
2807        // caller. Note that the host task isn't deallocated as it's
2808        // within the store and will get deallocated later.
2809        store.0.set_thread(caller);
2810        Ok(Some(handle))
2811    }
2812
2813    /// Implements the `task.return` intrinsic, lifting the result for the
2814    /// current guest task.
2815    pub(crate) fn task_return(
2816        self,
2817        store: &mut dyn VMStore,
2818        ty: TypeTupleIndex,
2819        options: OptionsIndex,
2820        storage: &[ValRaw],
2821    ) -> Result<()> {
2822        let state = store.concurrent_state_mut();
2823        let guest_thread = state.unwrap_current_guest_thread();
2824        let lift = state
2825            .get_mut(guest_thread.task)?
2826            .lift_result
2827            .take()
2828            .ok_or_else(|| {
2829                format_err!("`task.return` or `task.cancel` called more than once for current task")
2830            })?;
2831        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2832
2833        let CanonicalOptions {
2834            string_encoding,
2835            data_model,
2836            ..
2837        } = &self.id().get(store).component().env_component().options[options];
2838
2839        let invalid = ty != lift.ty
2840            || string_encoding != &lift.string_encoding
2841            || match data_model {
2842                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2843                    Some(memory) => {
2844                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2845                        let actual = self.id().get(store).runtime_memory(memory);
2846                        expected != actual.as_ptr()
2847                    }
2848                    // Memory not specified, meaning it didn't need to be
2849                    // specified per validation, so not invalid.
2850                    None => false,
2851                },
2852                // Always invalid as this isn't supported.
2853                CanonicalOptionsDataModel::Gc { .. } => true,
2854            };
2855
2856        if invalid {
2857            bail!("invalid `task.return` signature and/or options for current task");
2858        }
2859
2860        log::trace!("task.return for {guest_thread:?}");
2861
2862        let result = (lift.lift)(store, storage)?;
2863        self.task_complete(store, guest_thread.task, result, Status::Returned)
2864    }
2865
2866    /// Implements the `task.cancel` intrinsic.
2867    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2868        let state = store.concurrent_state_mut();
2869        let guest_thread = state.unwrap_current_guest_thread();
2870        let task = state.get_mut(guest_thread.task)?;
2871        if !task.cancel_sent {
2872            bail!("`task.cancel` called by task which has not been cancelled")
2873        }
2874        _ = task.lift_result.take().ok_or_else(|| {
2875            format_err!("`task.return` or `task.cancel` called more than once for current task")
2876        })?;
2877
2878        assert!(task.result.is_none());
2879
2880        log::trace!("task.cancel for {guest_thread:?}");
2881
2882        self.task_complete(
2883            store,
2884            guest_thread.task,
2885            Box::new(DummyResult),
2886            Status::ReturnCancelled,
2887        )
2888    }
2889
2890    /// Complete the specified guest task (i.e. indicate that it has either
2891    /// returned a (possibly empty) result or cancelled itself).
2892    ///
2893    /// This will return any resource borrows and notify any current or future
2894    /// waiters that the task has completed.
2895    fn task_complete(
2896        self,
2897        store: &mut StoreOpaque,
2898        guest_task: TableId<GuestTask>,
2899        result: Box<dyn Any + Send + Sync>,
2900        status: Status,
2901    ) -> Result<()> {
2902        store
2903            .component_resource_tables(Some(self))
2904            .validate_scope_exit()?;
2905
2906        let state = store.concurrent_state_mut();
2907        let task = state.get_mut(guest_task)?;
2908
2909        if let Caller::Host { tx, .. } = &mut task.caller {
2910            if let Some(tx) = tx.take() {
2911                _ = tx.send(result);
2912            }
2913        } else {
2914            task.result = Some(result);
2915            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2916        }
2917
2918        Ok(())
2919    }
2920
2921    /// Implements the `waitable-set.new` intrinsic.
2922    pub(crate) fn waitable_set_new(
2923        self,
2924        store: &mut StoreOpaque,
2925        caller_instance: RuntimeComponentInstanceIndex,
2926    ) -> Result<u32> {
2927        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2928        let handle = store
2929            .instance_state(RuntimeInstance {
2930                instance: self.id().instance(),
2931                index: caller_instance,
2932            })
2933            .handle_table()
2934            .waitable_set_insert(set.rep())?;
2935        log::trace!("new waitable set {set:?} (handle {handle})");
2936        Ok(handle)
2937    }
2938
2939    /// Implements the `waitable-set.drop` intrinsic.
2940    pub(crate) fn waitable_set_drop(
2941        self,
2942        store: &mut StoreOpaque,
2943        caller_instance: RuntimeComponentInstanceIndex,
2944        set: u32,
2945    ) -> Result<()> {
2946        let rep = store
2947            .instance_state(RuntimeInstance {
2948                instance: self.id().instance(),
2949                index: caller_instance,
2950            })
2951            .handle_table()
2952            .waitable_set_remove(set)?;
2953
2954        log::trace!("drop waitable set {rep} (handle {set})");
2955
2956        let set = store
2957            .concurrent_state_mut()
2958            .delete(TableId::<WaitableSet>::new(rep))?;
2959
2960        if !set.waiting.is_empty() {
2961            bail!("cannot drop waitable set with waiters");
2962        }
2963
2964        Ok(())
2965    }
2966
2967    /// Implements the `waitable.join` intrinsic.
2968    pub(crate) fn waitable_join(
2969        self,
2970        store: &mut StoreOpaque,
2971        caller_instance: RuntimeComponentInstanceIndex,
2972        waitable_handle: u32,
2973        set_handle: u32,
2974    ) -> Result<()> {
2975        let mut instance = self.id().get_mut(store);
2976        let waitable =
2977            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2978
2979        let set = if set_handle == 0 {
2980            None
2981        } else {
2982            let set = instance.instance_states().0[caller_instance]
2983                .handle_table()
2984                .waitable_set_rep(set_handle)?;
2985
2986            Some(TableId::<WaitableSet>::new(set))
2987        };
2988
2989        log::trace!(
2990            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2991        );
2992
2993        waitable.join(store.concurrent_state_mut(), set)
2994    }
2995
2996    /// Implements the `subtask.drop` intrinsic.
2997    pub(crate) fn subtask_drop(
2998        self,
2999        store: &mut StoreOpaque,
3000        caller_instance: RuntimeComponentInstanceIndex,
3001        task_id: u32,
3002    ) -> Result<()> {
3003        self.waitable_join(store, caller_instance, task_id, 0)?;
3004
3005        let (rep, is_host) = store
3006            .instance_state(RuntimeInstance {
3007                instance: self.id().instance(),
3008                index: caller_instance,
3009            })
3010            .handle_table()
3011            .subtask_remove(task_id)?;
3012
3013        let concurrent_state = store.concurrent_state_mut();
3014        let (waitable, expected_caller, delete) = if is_host {
3015            let id = TableId::<HostTask>::new(rep);
3016            let task = concurrent_state.get_mut(id)?;
3017            if task.join_handle.is_some() {
3018                bail!("cannot drop a subtask which has not yet resolved");
3019            }
3020            (Waitable::Host(id), task.caller, true)
3021        } else {
3022            let id = TableId::<GuestTask>::new(rep);
3023            let task = concurrent_state.get_mut(id)?;
3024            if task.lift_result.is_some() {
3025                bail!("cannot drop a subtask which has not yet resolved");
3026            }
3027            if let Caller::Guest { thread } = task.caller {
3028                (
3029                    Waitable::Guest(id),
3030                    thread,
3031                    concurrent_state.get_mut(id)?.exited,
3032                )
3033            } else {
3034                unreachable!()
3035            }
3036        };
3037
3038        waitable.common(concurrent_state)?.handle = None;
3039
3040        if waitable.take_event(concurrent_state)?.is_some() {
3041            bail!("cannot drop a subtask with an undelivered event");
3042        }
3043
3044        if delete {
3045            waitable.delete_from(concurrent_state)?;
3046        }
3047
3048        // Since waitables can neither be passed between instances nor forged,
3049        // this should never fail unless there's a bug in Wasmtime, but we check
3050        // here to be sure:
3051        assert_eq!(
3052            expected_caller,
3053            concurrent_state.unwrap_current_guest_thread(),
3054        );
3055        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3056        Ok(())
3057    }
3058
3059    /// Implements the `waitable-set.wait` intrinsic.
3060    pub(crate) fn waitable_set_wait(
3061        self,
3062        store: &mut StoreOpaque,
3063        options: OptionsIndex,
3064        set: u32,
3065        payload: u32,
3066    ) -> Result<u32> {
3067        if !self.options(store, options).async_ {
3068            // The caller may only call `waitable-set.wait` from an async task
3069            // (i.e. a task created via a call to an async export).
3070            // Otherwise, we'll trap.
3071            store.check_blocking()?;
3072        }
3073
3074        let &CanonicalOptions {
3075            cancellable,
3076            instance: caller_instance,
3077            ..
3078        } = &self.id().get(store).component().env_component().options[options];
3079        let rep = store
3080            .instance_state(RuntimeInstance {
3081                instance: self.id().instance(),
3082                index: caller_instance,
3083            })
3084            .handle_table()
3085            .waitable_set_rep(set)?;
3086
3087        self.waitable_check(
3088            store,
3089            cancellable,
3090            WaitableCheck::Wait,
3091            WaitableCheckParams {
3092                set: TableId::new(rep),
3093                options,
3094                payload,
3095            },
3096        )
3097    }
3098
3099    /// Implements the `waitable-set.poll` intrinsic.
3100    pub(crate) fn waitable_set_poll(
3101        self,
3102        store: &mut StoreOpaque,
3103        options: OptionsIndex,
3104        set: u32,
3105        payload: u32,
3106    ) -> Result<u32> {
3107        let &CanonicalOptions {
3108            cancellable,
3109            instance: caller_instance,
3110            ..
3111        } = &self.id().get(store).component().env_component().options[options];
3112        let rep = store
3113            .instance_state(RuntimeInstance {
3114                instance: self.id().instance(),
3115                index: caller_instance,
3116            })
3117            .handle_table()
3118            .waitable_set_rep(set)?;
3119
3120        self.waitable_check(
3121            store,
3122            cancellable,
3123            WaitableCheck::Poll,
3124            WaitableCheckParams {
3125                set: TableId::new(rep),
3126                options,
3127                payload,
3128            },
3129        )
3130    }
3131
3132    /// Implements the `thread.index` intrinsic.
3133    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3134        let thread_id = store
3135            .concurrent_state_mut()
3136            .unwrap_current_guest_thread()
3137            .thread;
3138        // The unwrap is safe because `instance_rep` must be `Some` by this point
3139        Ok(store
3140            .concurrent_state_mut()
3141            .get_mut(thread_id)?
3142            .instance_rep
3143            .unwrap())
3144    }
3145
3146    /// Implements the `thread.new-indirect` intrinsic.
3147    pub(crate) fn thread_new_indirect<T: 'static>(
3148        self,
3149        mut store: StoreContextMut<T>,
3150        runtime_instance: RuntimeComponentInstanceIndex,
3151        _func_ty_idx: TypeFuncIndex, // currently unused
3152        start_func_table_idx: RuntimeTableIndex,
3153        start_func_idx: u32,
3154        context: i32,
3155    ) -> Result<u32> {
3156        log::trace!("creating new thread");
3157
3158        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3159        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3160        let callee = instance
3161            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3162            .ok_or_else(|| {
3163                format_err!("the start function index points to an uninitialized function")
3164            })?;
3165        if callee.type_index(store.0) != start_func_ty.type_index() {
3166            bail!(
3167                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3168            );
3169        }
3170
3171        let token = StoreToken::new(store.as_context_mut());
3172        let start_func = Box::new(
3173            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3174                let old_thread = store.set_thread(guest_thread);
3175                log::trace!(
3176                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3177                );
3178
3179                let mut store = token.as_context_mut(store);
3180                let mut params = [ValRaw::i32(context)];
3181                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3182                // on a separate fiber if we're running in an async store.
3183                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3184
3185                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3186                log::trace!("explicit thread {guest_thread:?} completed");
3187                let state = store.0.concurrent_state_mut();
3188                let task = state.get_mut(guest_thread.task)?;
3189                if task.threads.is_empty() && !task.returned_or_cancelled() {
3190                    bail!(Trap::NoAsyncResult);
3191                }
3192                store.0.set_thread(old_thread);
3193                let state = store.0.concurrent_state_mut();
3194                old_thread
3195                    .guest()
3196                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3197                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3198                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3199                }
3200                log::trace!("thread start: restored {old_thread:?} as current thread");
3201
3202                Ok(())
3203            },
3204        );
3205
3206        let state = store.0.concurrent_state_mut();
3207        let current_thread = state.unwrap_current_guest_thread();
3208        let parent_task = current_thread.task;
3209
3210        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3211        let thread_id = state.push(new_thread)?;
3212        state.get_mut(parent_task)?.threads.insert(thread_id);
3213
3214        log::trace!("new thread with id {thread_id:?} created");
3215
3216        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3217    }
3218
3219    pub(crate) fn resume_suspended_thread(
3220        self,
3221        store: &mut StoreOpaque,
3222        runtime_instance: RuntimeComponentInstanceIndex,
3223        thread_idx: u32,
3224        high_priority: bool,
3225    ) -> Result<()> {
3226        let thread_id =
3227            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3228        let state = store.concurrent_state_mut();
3229        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3230        let thread = state.get_mut(guest_thread.thread)?;
3231
3232        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3233            GuestThreadState::NotStartedExplicit(start_func) => {
3234                log::trace!("starting thread {guest_thread:?}");
3235                let guest_call = WorkItem::GuestCall(GuestCall {
3236                    thread: guest_thread,
3237                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3238                        start_func(store, guest_thread)
3239                    })),
3240                });
3241                store
3242                    .concurrent_state_mut()
3243                    .push_work_item(guest_call, high_priority);
3244            }
3245            GuestThreadState::Suspended(fiber) => {
3246                log::trace!("resuming thread {thread_id:?} that was suspended");
3247                store
3248                    .concurrent_state_mut()
3249                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3250            }
3251            _ => {
3252                bail!("cannot resume thread which is not suspended");
3253            }
3254        }
3255        Ok(())
3256    }
3257
3258    fn add_guest_thread_to_instance_table(
3259        self,
3260        thread_id: TableId<GuestThread>,
3261        store: &mut StoreOpaque,
3262        runtime_instance: RuntimeComponentInstanceIndex,
3263    ) -> Result<u32> {
3264        let guest_id = store
3265            .instance_state(RuntimeInstance {
3266                instance: self.id().instance(),
3267                index: runtime_instance,
3268            })
3269            .thread_handle_table()
3270            .guest_thread_insert(thread_id.rep())?;
3271        store
3272            .concurrent_state_mut()
3273            .get_mut(thread_id)?
3274            .instance_rep = Some(guest_id);
3275        Ok(guest_id)
3276    }
3277
3278    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3279    /// and `thread.switch-to` intrinsics.
3280    pub(crate) fn suspension_intrinsic(
3281        self,
3282        store: &mut StoreOpaque,
3283        caller: RuntimeComponentInstanceIndex,
3284        cancellable: bool,
3285        yielding: bool,
3286        to_thread: Option<u32>,
3287    ) -> Result<WaitResult> {
3288        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3289        if to_thread.is_none() {
3290            let state = store.concurrent_state_mut();
3291            if yielding {
3292                // This is a `thread.yield` call
3293                if !state.may_block(guest_thread.task) {
3294                    // The spec defines `thread.yield` to be a no-op in a
3295                    // non-blocking context, so we return immediately without giving
3296                    // any other thread a chance to run.
3297                    return Ok(WaitResult::Completed);
3298                }
3299            } else {
3300                // The caller may only call `thread.suspend` from an async task
3301                // (i.e. a task created via a call to an async export).
3302                // Otherwise, we'll trap.
3303                store.check_blocking()?;
3304            }
3305        }
3306
3307        // There could be a pending cancellation from a previous uncancellable wait
3308        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3309            return Ok(WaitResult::Cancelled);
3310        }
3311
3312        if let Some(thread) = to_thread {
3313            self.resume_suspended_thread(store, caller, thread, true)?;
3314        }
3315
3316        let reason = if yielding {
3317            SuspendReason::Yielding {
3318                thread: guest_thread,
3319                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3320                // we're handling a `thread.yield-to` call; otherwise it would
3321                // panic if we called it in a non-blocking context.
3322                skip_may_block_check: to_thread.is_some(),
3323            }
3324        } else {
3325            SuspendReason::ExplicitlySuspending {
3326                thread: guest_thread,
3327                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3328                // we're handling a `thread.switch-to` call; otherwise it would
3329                // panic if we called it in a non-blocking context.
3330                skip_may_block_check: to_thread.is_some(),
3331            }
3332        };
3333
3334        store.suspend(reason)?;
3335
3336        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3337            Ok(WaitResult::Cancelled)
3338        } else {
3339            Ok(WaitResult::Completed)
3340        }
3341    }
3342
3343    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3344    fn waitable_check(
3345        self,
3346        store: &mut StoreOpaque,
3347        cancellable: bool,
3348        check: WaitableCheck,
3349        params: WaitableCheckParams,
3350    ) -> Result<u32> {
3351        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3352
3353        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3354
3355        let state = store.concurrent_state_mut();
3356        let task = state.get_mut(guest_thread.task)?;
3357
3358        // If we're waiting, and there are no events immediately available,
3359        // suspend the fiber until that changes.
3360        match &check {
3361            WaitableCheck::Wait => {
3362                let set = params.set;
3363
3364                if (task.event.is_none()
3365                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3366                    && state.get_mut(set)?.ready.is_empty()
3367                {
3368                    if cancellable {
3369                        let old = state
3370                            .get_mut(guest_thread.thread)?
3371                            .wake_on_cancel
3372                            .replace(set);
3373                        assert!(old.is_none());
3374                    }
3375
3376                    store.suspend(SuspendReason::Waiting {
3377                        set,
3378                        thread: guest_thread,
3379                        skip_may_block_check: false,
3380                    })?;
3381                }
3382            }
3383            WaitableCheck::Poll => {}
3384        }
3385
3386        log::trace!(
3387            "waitable check for {guest_thread:?}; set {:?}, part two",
3388            params.set
3389        );
3390
3391        // Deliver any pending events to the guest and return.
3392        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3393
3394        let (ordinal, handle, result) = match &check {
3395            WaitableCheck::Wait => {
3396                let (event, waitable) = event.unwrap();
3397                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3398                let (ordinal, result) = event.parts();
3399                (ordinal, handle, result)
3400            }
3401            WaitableCheck::Poll => {
3402                if let Some((event, waitable)) = event {
3403                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3404                    let (ordinal, result) = event.parts();
3405                    (ordinal, handle, result)
3406                } else {
3407                    log::trace!(
3408                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3409                        guest_thread.task,
3410                        params.set
3411                    );
3412                    let (ordinal, result) = Event::None.parts();
3413                    (ordinal, 0, result)
3414                }
3415            }
3416        };
3417        let memory = self.options_memory_mut(store, params.options);
3418        let ptr = func::validate_inbounds_dynamic(
3419            &CanonicalAbiInfo::POINTER_PAIR,
3420            memory,
3421            &ValRaw::u32(params.payload),
3422        )?;
3423        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3424        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3425        Ok(ordinal)
3426    }
3427
3428    /// Implements the `subtask.cancel` intrinsic.
3429    pub(crate) fn subtask_cancel(
3430        self,
3431        store: &mut StoreOpaque,
3432        caller_instance: RuntimeComponentInstanceIndex,
3433        async_: bool,
3434        task_id: u32,
3435    ) -> Result<u32> {
3436        if !async_ {
3437            // The caller may only sync call `subtask.cancel` from an async task
3438            // (i.e. a task created via a call to an async export).  Otherwise,
3439            // we'll trap.
3440            store.check_blocking()?;
3441        }
3442
3443        let (rep, is_host) = store
3444            .instance_state(RuntimeInstance {
3445                instance: self.id().instance(),
3446                index: caller_instance,
3447            })
3448            .handle_table()
3449            .subtask_rep(task_id)?;
3450        let (waitable, expected_caller) = if is_host {
3451            let id = TableId::<HostTask>::new(rep);
3452            (
3453                Waitable::Host(id),
3454                store.concurrent_state_mut().get_mut(id)?.caller,
3455            )
3456        } else {
3457            let id = TableId::<GuestTask>::new(rep);
3458            if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3459                (Waitable::Guest(id), thread)
3460            } else {
3461                unreachable!()
3462            }
3463        };
3464        // Since waitables can neither be passed between instances nor forged,
3465        // this should never fail unless there's a bug in Wasmtime, but we check
3466        // here to be sure:
3467        let concurrent_state = store.concurrent_state_mut();
3468        assert_eq!(
3469            expected_caller,
3470            concurrent_state.unwrap_current_guest_thread(),
3471        );
3472
3473        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3474
3475        if let Waitable::Host(host_task) = waitable {
3476            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3477                handle.abort();
3478                return Ok(Status::ReturnCancelled as u32);
3479            }
3480        } else {
3481            let caller = concurrent_state.unwrap_current_guest_thread();
3482            let guest_task = TableId::<GuestTask>::new(rep);
3483            let task = concurrent_state.get_mut(guest_task)?;
3484            if !task.already_lowered_parameters() {
3485                // The task is in a `starting` state, meaning it hasn't run at
3486                // all yet.  Here we update its fields to indicate that it is
3487                // ready to delete immediately once `subtask.drop` is called.
3488                task.lower_params = None;
3489                task.lift_result = None;
3490                task.exited = true;
3491
3492                let instance = task.instance;
3493
3494                assert_eq!(1, task.threads.len());
3495                let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3496                let concurrent_state = store.concurrent_state_mut();
3497                concurrent_state.delete(thread)?;
3498                assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3499
3500                // Not yet started; cancel and remove from pending
3501                let pending = &mut store.instance_state(instance).concurrent_state().pending;
3502                let pending_count = pending.len();
3503                pending.retain(|thread, _| thread.task != guest_task);
3504                // If there were no pending threads for this task, we're in an error state
3505                if pending.len() == pending_count {
3506                    bail!("`subtask.cancel` called after terminal status delivered");
3507                }
3508                return Ok(Status::StartCancelled as u32);
3509            } else if !task.returned_or_cancelled() {
3510                // Started, but not yet returned or cancelled; send the
3511                // `CANCELLED` event
3512                task.cancel_sent = true;
3513                // Note that this might overwrite an event that was set earlier
3514                // (e.g. `Event::None` if the task is yielding, or
3515                // `Event::Cancelled` if it was already cancelled), but that's
3516                // okay -- this should supersede the previous state.
3517                task.event = Some(Event::Cancelled);
3518                for thread in task.threads.clone() {
3519                    let thread = QualifiedThreadId {
3520                        task: guest_task,
3521                        thread,
3522                    };
3523                    if let Some(set) = concurrent_state
3524                        .get_mut(thread.thread)
3525                        .unwrap()
3526                        .wake_on_cancel
3527                        .take()
3528                    {
3529                        let item = match concurrent_state
3530                            .get_mut(set)?
3531                            .waiting
3532                            .remove(&thread)
3533                            .unwrap()
3534                        {
3535                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3536                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3537                                thread,
3538                                kind: GuestCallKind::DeliverEvent {
3539                                    instance,
3540                                    set: None,
3541                                },
3542                            }),
3543                        };
3544                        concurrent_state.push_high_priority(item);
3545
3546                        store.suspend(SuspendReason::Yielding {
3547                            thread: caller,
3548                            // `subtask.cancel` is not allowed to be called in a
3549                            // sync context, so we cannot skip the may-block check.
3550                            skip_may_block_check: false,
3551                        })?;
3552                        break;
3553                    }
3554                }
3555
3556                let concurrent_state = store.concurrent_state_mut();
3557                let task = concurrent_state.get_mut(guest_task)?;
3558                if !task.returned_or_cancelled() {
3559                    if async_ {
3560                        return Ok(BLOCKED);
3561                    } else {
3562                        store.wait_for_event(Waitable::Guest(guest_task))?;
3563                    }
3564                }
3565            }
3566        }
3567
3568        let event = waitable.take_event(store.concurrent_state_mut())?;
3569        if let Some(Event::Subtask {
3570            status: status @ (Status::Returned | Status::ReturnCancelled),
3571        }) = event
3572        {
3573            Ok(status as u32)
3574        } else {
3575            bail!("`subtask.cancel` called after terminal status delivered");
3576        }
3577    }
3578
3579    pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3580        store.concurrent_state_mut().context_get(slot)
3581    }
3582
3583    pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3584        store.concurrent_state_mut().context_set(slot, value)
3585    }
3586}
3587
3588/// Trait representing component model ABI async intrinsics and fused adapter
3589/// helper functions.
3590///
3591/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3592/// which must be valid for at least the duration of the call (and possibly for
3593/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3594/// pointers used for async calls).
3595pub trait VMComponentAsyncStore {
3596    /// A helper function for fused adapter modules involving calls where the
3597    /// one of the caller or callee is async.
3598    ///
3599    /// This helper is not used when the caller and callee both use the sync
3600    /// ABI, only when at least one is async is this used.
3601    unsafe fn prepare_call(
3602        &mut self,
3603        instance: Instance,
3604        memory: *mut VMMemoryDefinition,
3605        start: *mut VMFuncRef,
3606        return_: *mut VMFuncRef,
3607        caller_instance: RuntimeComponentInstanceIndex,
3608        callee_instance: RuntimeComponentInstanceIndex,
3609        task_return_type: TypeTupleIndex,
3610        callee_async: bool,
3611        string_encoding: u8,
3612        result_count: u32,
3613        storage: *mut ValRaw,
3614        storage_len: usize,
3615    ) -> Result<()>;
3616
3617    /// A helper function for fused adapter modules involving calls where the
3618    /// caller is sync-lowered but the callee is async-lifted.
3619    unsafe fn sync_start(
3620        &mut self,
3621        instance: Instance,
3622        callback: *mut VMFuncRef,
3623        callee: *mut VMFuncRef,
3624        param_count: u32,
3625        storage: *mut MaybeUninit<ValRaw>,
3626        storage_len: usize,
3627    ) -> Result<()>;
3628
3629    /// A helper function for fused adapter modules involving calls where the
3630    /// caller is async-lowered.
3631    unsafe fn async_start(
3632        &mut self,
3633        instance: Instance,
3634        callback: *mut VMFuncRef,
3635        post_return: *mut VMFuncRef,
3636        callee: *mut VMFuncRef,
3637        param_count: u32,
3638        result_count: u32,
3639        flags: u32,
3640    ) -> Result<u32>;
3641
3642    /// The `future.write` intrinsic.
3643    fn future_write(
3644        &mut self,
3645        instance: Instance,
3646        caller: RuntimeComponentInstanceIndex,
3647        ty: TypeFutureTableIndex,
3648        options: OptionsIndex,
3649        future: u32,
3650        address: u32,
3651    ) -> Result<u32>;
3652
3653    /// The `future.read` intrinsic.
3654    fn future_read(
3655        &mut self,
3656        instance: Instance,
3657        caller: RuntimeComponentInstanceIndex,
3658        ty: TypeFutureTableIndex,
3659        options: OptionsIndex,
3660        future: u32,
3661        address: u32,
3662    ) -> Result<u32>;
3663
3664    /// The `future.drop-writable` intrinsic.
3665    fn future_drop_writable(
3666        &mut self,
3667        instance: Instance,
3668        ty: TypeFutureTableIndex,
3669        writer: u32,
3670    ) -> Result<()>;
3671
3672    /// The `stream.write` intrinsic.
3673    fn stream_write(
3674        &mut self,
3675        instance: Instance,
3676        caller: RuntimeComponentInstanceIndex,
3677        ty: TypeStreamTableIndex,
3678        options: OptionsIndex,
3679        stream: u32,
3680        address: u32,
3681        count: u32,
3682    ) -> Result<u32>;
3683
3684    /// The `stream.read` intrinsic.
3685    fn stream_read(
3686        &mut self,
3687        instance: Instance,
3688        caller: RuntimeComponentInstanceIndex,
3689        ty: TypeStreamTableIndex,
3690        options: OptionsIndex,
3691        stream: u32,
3692        address: u32,
3693        count: u32,
3694    ) -> Result<u32>;
3695
3696    /// The "fast-path" implementation of the `stream.write` intrinsic for
3697    /// "flat" (i.e. memcpy-able) payloads.
3698    fn flat_stream_write(
3699        &mut self,
3700        instance: Instance,
3701        caller: RuntimeComponentInstanceIndex,
3702        ty: TypeStreamTableIndex,
3703        options: OptionsIndex,
3704        payload_size: u32,
3705        payload_align: u32,
3706        stream: u32,
3707        address: u32,
3708        count: u32,
3709    ) -> Result<u32>;
3710
3711    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3712    /// (i.e. memcpy-able) payloads.
3713    fn flat_stream_read(
3714        &mut self,
3715        instance: Instance,
3716        caller: RuntimeComponentInstanceIndex,
3717        ty: TypeStreamTableIndex,
3718        options: OptionsIndex,
3719        payload_size: u32,
3720        payload_align: u32,
3721        stream: u32,
3722        address: u32,
3723        count: u32,
3724    ) -> Result<u32>;
3725
3726    /// The `stream.drop-writable` intrinsic.
3727    fn stream_drop_writable(
3728        &mut self,
3729        instance: Instance,
3730        ty: TypeStreamTableIndex,
3731        writer: u32,
3732    ) -> Result<()>;
3733
3734    /// The `error-context.debug-message` intrinsic.
3735    fn error_context_debug_message(
3736        &mut self,
3737        instance: Instance,
3738        ty: TypeComponentLocalErrorContextTableIndex,
3739        options: OptionsIndex,
3740        err_ctx_handle: u32,
3741        debug_msg_address: u32,
3742    ) -> Result<()>;
3743
3744    /// The `thread.new-indirect` intrinsic
3745    fn thread_new_indirect(
3746        &mut self,
3747        instance: Instance,
3748        caller: RuntimeComponentInstanceIndex,
3749        func_ty_idx: TypeFuncIndex,
3750        start_func_table_idx: RuntimeTableIndex,
3751        start_func_idx: u32,
3752        context: i32,
3753    ) -> Result<u32>;
3754}
3755
3756/// SAFETY: See trait docs.
3757impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3758    unsafe fn prepare_call(
3759        &mut self,
3760        instance: Instance,
3761        memory: *mut VMMemoryDefinition,
3762        start: *mut VMFuncRef,
3763        return_: *mut VMFuncRef,
3764        caller_instance: RuntimeComponentInstanceIndex,
3765        callee_instance: RuntimeComponentInstanceIndex,
3766        task_return_type: TypeTupleIndex,
3767        callee_async: bool,
3768        string_encoding: u8,
3769        result_count_or_max_if_async: u32,
3770        storage: *mut ValRaw,
3771        storage_len: usize,
3772    ) -> Result<()> {
3773        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3774        // this method will have ensured that `storage` is a valid
3775        // pointer containing at least `storage_len` items.
3776        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3777
3778        unsafe {
3779            instance.prepare_call(
3780                StoreContextMut(self),
3781                start,
3782                return_,
3783                caller_instance,
3784                callee_instance,
3785                task_return_type,
3786                callee_async,
3787                memory,
3788                string_encoding,
3789                match result_count_or_max_if_async {
3790                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3791                        params,
3792                        has_result: false,
3793                    },
3794                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3795                        params,
3796                        has_result: true,
3797                    },
3798                    result_count => CallerInfo::Sync {
3799                        params,
3800                        result_count,
3801                    },
3802                },
3803            )
3804        }
3805    }
3806
3807    unsafe fn sync_start(
3808        &mut self,
3809        instance: Instance,
3810        callback: *mut VMFuncRef,
3811        callee: *mut VMFuncRef,
3812        param_count: u32,
3813        storage: *mut MaybeUninit<ValRaw>,
3814        storage_len: usize,
3815    ) -> Result<()> {
3816        unsafe {
3817            instance
3818                .start_call(
3819                    StoreContextMut(self),
3820                    callback,
3821                    ptr::null_mut(),
3822                    callee,
3823                    param_count,
3824                    1,
3825                    START_FLAG_ASYNC_CALLEE,
3826                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3827                    // this method will have ensured that `storage` is a valid
3828                    // pointer containing at least `storage_len` items.
3829                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3830                )
3831                .map(drop)
3832        }
3833    }
3834
3835    unsafe fn async_start(
3836        &mut self,
3837        instance: Instance,
3838        callback: *mut VMFuncRef,
3839        post_return: *mut VMFuncRef,
3840        callee: *mut VMFuncRef,
3841        param_count: u32,
3842        result_count: u32,
3843        flags: u32,
3844    ) -> Result<u32> {
3845        unsafe {
3846            instance.start_call(
3847                StoreContextMut(self),
3848                callback,
3849                post_return,
3850                callee,
3851                param_count,
3852                result_count,
3853                flags,
3854                None,
3855            )
3856        }
3857    }
3858
3859    fn future_write(
3860        &mut self,
3861        instance: Instance,
3862        caller: RuntimeComponentInstanceIndex,
3863        ty: TypeFutureTableIndex,
3864        options: OptionsIndex,
3865        future: u32,
3866        address: u32,
3867    ) -> Result<u32> {
3868        instance
3869            .guest_write(
3870                StoreContextMut(self),
3871                caller,
3872                TransmitIndex::Future(ty),
3873                options,
3874                None,
3875                future,
3876                address,
3877                1,
3878            )
3879            .map(|result| result.encode())
3880    }
3881
3882    fn future_read(
3883        &mut self,
3884        instance: Instance,
3885        caller: RuntimeComponentInstanceIndex,
3886        ty: TypeFutureTableIndex,
3887        options: OptionsIndex,
3888        future: u32,
3889        address: u32,
3890    ) -> Result<u32> {
3891        instance
3892            .guest_read(
3893                StoreContextMut(self),
3894                caller,
3895                TransmitIndex::Future(ty),
3896                options,
3897                None,
3898                future,
3899                address,
3900                1,
3901            )
3902            .map(|result| result.encode())
3903    }
3904
3905    fn stream_write(
3906        &mut self,
3907        instance: Instance,
3908        caller: RuntimeComponentInstanceIndex,
3909        ty: TypeStreamTableIndex,
3910        options: OptionsIndex,
3911        stream: u32,
3912        address: u32,
3913        count: u32,
3914    ) -> Result<u32> {
3915        instance
3916            .guest_write(
3917                StoreContextMut(self),
3918                caller,
3919                TransmitIndex::Stream(ty),
3920                options,
3921                None,
3922                stream,
3923                address,
3924                count,
3925            )
3926            .map(|result| result.encode())
3927    }
3928
3929    fn stream_read(
3930        &mut self,
3931        instance: Instance,
3932        caller: RuntimeComponentInstanceIndex,
3933        ty: TypeStreamTableIndex,
3934        options: OptionsIndex,
3935        stream: u32,
3936        address: u32,
3937        count: u32,
3938    ) -> Result<u32> {
3939        instance
3940            .guest_read(
3941                StoreContextMut(self),
3942                caller,
3943                TransmitIndex::Stream(ty),
3944                options,
3945                None,
3946                stream,
3947                address,
3948                count,
3949            )
3950            .map(|result| result.encode())
3951    }
3952
3953    fn future_drop_writable(
3954        &mut self,
3955        instance: Instance,
3956        ty: TypeFutureTableIndex,
3957        writer: u32,
3958    ) -> Result<()> {
3959        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3960    }
3961
3962    fn flat_stream_write(
3963        &mut self,
3964        instance: Instance,
3965        caller: RuntimeComponentInstanceIndex,
3966        ty: TypeStreamTableIndex,
3967        options: OptionsIndex,
3968        payload_size: u32,
3969        payload_align: u32,
3970        stream: u32,
3971        address: u32,
3972        count: u32,
3973    ) -> Result<u32> {
3974        instance
3975            .guest_write(
3976                StoreContextMut(self),
3977                caller,
3978                TransmitIndex::Stream(ty),
3979                options,
3980                Some(FlatAbi {
3981                    size: payload_size,
3982                    align: payload_align,
3983                }),
3984                stream,
3985                address,
3986                count,
3987            )
3988            .map(|result| result.encode())
3989    }
3990
3991    fn flat_stream_read(
3992        &mut self,
3993        instance: Instance,
3994        caller: RuntimeComponentInstanceIndex,
3995        ty: TypeStreamTableIndex,
3996        options: OptionsIndex,
3997        payload_size: u32,
3998        payload_align: u32,
3999        stream: u32,
4000        address: u32,
4001        count: u32,
4002    ) -> Result<u32> {
4003        instance
4004            .guest_read(
4005                StoreContextMut(self),
4006                caller,
4007                TransmitIndex::Stream(ty),
4008                options,
4009                Some(FlatAbi {
4010                    size: payload_size,
4011                    align: payload_align,
4012                }),
4013                stream,
4014                address,
4015                count,
4016            )
4017            .map(|result| result.encode())
4018    }
4019
4020    fn stream_drop_writable(
4021        &mut self,
4022        instance: Instance,
4023        ty: TypeStreamTableIndex,
4024        writer: u32,
4025    ) -> Result<()> {
4026        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4027    }
4028
4029    fn error_context_debug_message(
4030        &mut self,
4031        instance: Instance,
4032        ty: TypeComponentLocalErrorContextTableIndex,
4033        options: OptionsIndex,
4034        err_ctx_handle: u32,
4035        debug_msg_address: u32,
4036    ) -> Result<()> {
4037        instance.error_context_debug_message(
4038            StoreContextMut(self),
4039            ty,
4040            options,
4041            err_ctx_handle,
4042            debug_msg_address,
4043        )
4044    }
4045
4046    fn thread_new_indirect(
4047        &mut self,
4048        instance: Instance,
4049        caller: RuntimeComponentInstanceIndex,
4050        func_ty_idx: TypeFuncIndex,
4051        start_func_table_idx: RuntimeTableIndex,
4052        start_func_idx: u32,
4053        context: i32,
4054    ) -> Result<u32> {
4055        instance.thread_new_indirect(
4056            StoreContextMut(self),
4057            caller,
4058            func_ty_idx,
4059            start_func_table_idx,
4060            start_func_idx,
4061            context,
4062        )
4063    }
4064}
4065
4066type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4067
4068/// Represents the state of a pending host task.
4069///
4070/// This is used to represent tasks when the guest calls into the host.
4071struct HostTask {
4072    common: WaitableCommon,
4073
4074    /// Guest thread which called the host.
4075    caller: QualifiedThreadId,
4076
4077    /// State of borrows/etc the host needs to track. Used when the guest passes
4078    /// borrows to the host, for example.
4079    call_context: CallContext,
4080
4081    /// For host tasks which end up doing some asynchronous work (e.g.
4082    /// async-lowered and didn't complete on the first poll) this handle is used
4083    /// as a signal to cancel the future as it resides in the store's
4084    /// `FuturesUnordered`.
4085    join_handle: Option<JoinHandle>,
4086
4087    /// Box<Any> of the result of this host task.
4088    result: Option<LiftedResult>,
4089}
4090
4091impl HostTask {
4092    fn new(caller: QualifiedThreadId) -> Self {
4093        Self {
4094            common: WaitableCommon::default(),
4095            call_context: CallContext::default(),
4096            caller,
4097            join_handle: None,
4098            result: None,
4099        }
4100    }
4101}
4102
4103impl TableDebug for HostTask {
4104    fn type_name() -> &'static str {
4105        "HostTask"
4106    }
4107}
4108
4109type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4110
4111/// Represents the caller of a given guest task.
4112enum Caller {
4113    /// The host called the guest task.
4114    Host {
4115        /// If present, may be used to deliver the result.
4116        tx: Option<oneshot::Sender<LiftedResult>>,
4117        /// Channel to notify once all subtasks spawned by this caller have
4118        /// completed.
4119        ///
4120        /// Note that we'll never actually send anything to this channel;
4121        /// dropping it when the refcount goes to zero is sufficient to notify
4122        /// the receiver.
4123        exit_tx: Arc<oneshot::Sender<()>>,
4124        /// If true, there's a host future that must be dropped before the task
4125        /// can be deleted.
4126        host_future_present: bool,
4127        /// Represents the caller of the host function which called back into a
4128        /// guest. Note that this thread could belong to an entirely unrelated
4129        /// top-level component instance than the one the host called into.
4130        caller: CurrentThread,
4131    },
4132    /// Another guest thread called the guest task
4133    Guest {
4134        /// The id of the caller
4135        thread: QualifiedThreadId,
4136    },
4137}
4138
4139/// Represents a closure and related canonical ABI parameters required to
4140/// validate a `task.return` call at runtime and lift the result.
4141struct LiftResult {
4142    lift: RawLift,
4143    ty: TypeTupleIndex,
4144    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4145    string_encoding: StringEncoding,
4146}
4147
4148/// The table ID for a guest thread, qualified by the task to which it belongs.
4149///
4150/// This exists to minimize table lookups and the necessity to pass stores around mutably
4151/// for the common case of identifying the task to which a thread belongs.
4152#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4153struct QualifiedThreadId {
4154    task: TableId<GuestTask>,
4155    thread: TableId<GuestThread>,
4156}
4157
4158impl QualifiedThreadId {
4159    fn qualify(
4160        state: &mut ConcurrentState,
4161        thread: TableId<GuestThread>,
4162    ) -> Result<QualifiedThreadId> {
4163        Ok(QualifiedThreadId {
4164            task: state.get_mut(thread)?.parent_task,
4165            thread,
4166        })
4167    }
4168}
4169
4170impl fmt::Debug for QualifiedThreadId {
4171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4172        f.debug_tuple("QualifiedThreadId")
4173            .field(&self.task.rep())
4174            .field(&self.thread.rep())
4175            .finish()
4176    }
4177}
4178
4179enum GuestThreadState {
4180    NotStartedImplicit,
4181    NotStartedExplicit(
4182        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4183    ),
4184    Running,
4185    Suspended(StoreFiber<'static>),
4186    Pending,
4187    Completed,
4188}
4189pub struct GuestThread {
4190    /// Context-local state used to implement the `context.{get,set}`
4191    /// intrinsics.
4192    context: [u32; 2],
4193    /// The owning guest task.
4194    parent_task: TableId<GuestTask>,
4195    /// If present, indicates that the thread is currently waiting on the
4196    /// specified set but may be cancelled and woken immediately.
4197    wake_on_cancel: Option<TableId<WaitableSet>>,
4198    /// The execution state of this guest thread
4199    state: GuestThreadState,
4200    /// The index of this thread in the component instance's handle table.
4201    /// This must always be `Some` after initialization.
4202    instance_rep: Option<u32>,
4203}
4204
4205impl GuestThread {
4206    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4207    /// handle.
4208    fn from_instance(
4209        state: Pin<&mut ComponentInstance>,
4210        caller_instance: RuntimeComponentInstanceIndex,
4211        guest_thread: u32,
4212    ) -> Result<TableId<Self>> {
4213        let rep = state.instance_states().0[caller_instance]
4214            .thread_handle_table()
4215            .guest_thread_rep(guest_thread)?;
4216        Ok(TableId::new(rep))
4217    }
4218
4219    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4220        Self {
4221            context: [0; 2],
4222            parent_task,
4223            wake_on_cancel: None,
4224            state: GuestThreadState::NotStartedImplicit,
4225            instance_rep: None,
4226        }
4227    }
4228
4229    fn new_explicit(
4230        parent_task: TableId<GuestTask>,
4231        start_func: Box<
4232            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4233        >,
4234    ) -> Self {
4235        Self {
4236            context: [0; 2],
4237            parent_task,
4238            wake_on_cancel: None,
4239            state: GuestThreadState::NotStartedExplicit(start_func),
4240            instance_rep: None,
4241        }
4242    }
4243}
4244
4245impl TableDebug for GuestThread {
4246    fn type_name() -> &'static str {
4247        "GuestThread"
4248    }
4249}
4250
4251enum SyncResult {
4252    NotProduced,
4253    Produced(Option<ValRaw>),
4254    Taken,
4255}
4256
4257impl SyncResult {
4258    fn take(&mut self) -> Option<Option<ValRaw>> {
4259        match mem::replace(self, SyncResult::Taken) {
4260            SyncResult::NotProduced => None,
4261            SyncResult::Produced(val) => Some(val),
4262            SyncResult::Taken => {
4263                panic!("attempted to take a synchronous result that was already taken")
4264            }
4265        }
4266    }
4267}
4268
4269#[derive(Debug)]
4270enum HostFutureState {
4271    NotApplicable,
4272    Live,
4273    Dropped,
4274}
4275
4276/// Represents a pending guest task.
4277pub(crate) struct GuestTask {
4278    /// See `WaitableCommon`
4279    common: WaitableCommon,
4280    /// Closure to lower the parameters passed to this task.
4281    lower_params: Option<RawLower>,
4282    /// See `LiftResult`
4283    lift_result: Option<LiftResult>,
4284    /// A place to stash the type-erased lifted result if it can't be delivered
4285    /// immediately.
4286    result: Option<LiftedResult>,
4287    /// Closure to call the callback function for an async-lifted export, if
4288    /// provided.
4289    callback: Option<CallbackFn>,
4290    /// See `Caller`
4291    caller: Caller,
4292    /// Borrow state for this task.
4293    ///
4294    /// Keeps track of `borrow<T>` received to this task to ensure that
4295    /// everything is dropped by the time it exits.
4296    call_context: CallContext,
4297    /// A place to stash the lowered result for a sync-to-async call until it
4298    /// can be returned to the caller.
4299    sync_result: SyncResult,
4300    /// Whether or not the task has been cancelled (i.e. whether the task is
4301    /// permitted to call `task.cancel`).
4302    cancel_sent: bool,
4303    /// Whether or not we've sent a `Status::Starting` event to any current or
4304    /// future waiters for this waitable.
4305    starting_sent: bool,
4306    /// Pending guest subtasks created by this task (directly or indirectly).
4307    ///
4308    /// This is used to re-parent subtasks which are still running when their
4309    /// parent task is disposed.
4310    subtasks: HashSet<TableId<GuestTask>>,
4311    /// Scratch waitable set used to watch subtasks during synchronous calls.
4312    sync_call_set: TableId<WaitableSet>,
4313    /// The runtime instance to which the exported function for this guest task
4314    /// belongs.
4315    ///
4316    /// Note that the task may do a sync->sync call via a fused adapter which
4317    /// results in that task executing code in a different instance, and it may
4318    /// call host functions and intrinsics from that other instance.
4319    instance: RuntimeInstance,
4320    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4321    /// delivered to this task.
4322    event: Option<Event>,
4323    /// Whether or not the task has exited.
4324    exited: bool,
4325    /// Threads belonging to this task
4326    threads: HashSet<TableId<GuestThread>>,
4327    /// The state of the host future that represents an async task, which must
4328    /// be dropped before we can delete the task.
4329    host_future_state: HostFutureState,
4330    /// Indicates whether this task was created for a call to an async-lifted
4331    /// export.
4332    async_function: bool,
4333}
4334
4335impl GuestTask {
4336    fn already_lowered_parameters(&self) -> bool {
4337        // We reset `lower_params` after we lower the parameters
4338        self.lower_params.is_none()
4339    }
4340
4341    fn returned_or_cancelled(&self) -> bool {
4342        // We reset `lift_result` after we return or exit
4343        self.lift_result.is_none()
4344    }
4345
4346    fn ready_to_delete(&self) -> bool {
4347        let threads_completed = self.threads.is_empty();
4348        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4349        let pending_completion_event = matches!(
4350            self.common.event,
4351            Some(Event::Subtask {
4352                status: Status::Returned | Status::ReturnCancelled
4353            })
4354        );
4355        let ready = threads_completed
4356            && !has_sync_result
4357            && !pending_completion_event
4358            && !matches!(self.host_future_state, HostFutureState::Live);
4359        log::trace!(
4360            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4361            threads_completed,
4362            has_sync_result,
4363            pending_completion_event,
4364            self.host_future_state
4365        );
4366        ready
4367    }
4368
4369    fn new(
4370        state: &mut ConcurrentState,
4371        lower_params: RawLower,
4372        lift_result: LiftResult,
4373        caller: Caller,
4374        callback: Option<CallbackFn>,
4375        instance: RuntimeInstance,
4376        async_function: bool,
4377    ) -> Result<Self> {
4378        let sync_call_set = state.push(WaitableSet::default())?;
4379        let host_future_state = match &caller {
4380            Caller::Guest { .. } => HostFutureState::NotApplicable,
4381            Caller::Host {
4382                host_future_present,
4383                ..
4384            } => {
4385                if *host_future_present {
4386                    HostFutureState::Live
4387                } else {
4388                    HostFutureState::NotApplicable
4389                }
4390            }
4391        };
4392        Ok(Self {
4393            common: WaitableCommon::default(),
4394            lower_params: Some(lower_params),
4395            lift_result: Some(lift_result),
4396            result: None,
4397            callback,
4398            caller,
4399            call_context: CallContext::default(),
4400            sync_result: SyncResult::NotProduced,
4401            cancel_sent: false,
4402            starting_sent: false,
4403            subtasks: HashSet::new(),
4404            sync_call_set,
4405            instance,
4406            event: None,
4407            exited: false,
4408            threads: HashSet::new(),
4409            host_future_state,
4410            async_function,
4411        })
4412    }
4413
4414    /// Dispose of this guest task, reparenting any pending subtasks to the
4415    /// caller.
4416    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4417        // If there are not-yet-delivered completion events for subtasks in
4418        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4419        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4420            if let Some(Event::Subtask {
4421                status: Status::Returned | Status::ReturnCancelled,
4422            }) = waitable.common(state)?.event
4423            {
4424                waitable.delete_from(state)?;
4425            }
4426        }
4427
4428        assert!(self.threads.is_empty());
4429
4430        state.delete(self.sync_call_set)?;
4431
4432        // Reparent any pending subtasks to the caller.
4433        match &self.caller {
4434            Caller::Guest { thread } => {
4435                let task_mut = state.get_mut(thread.task)?;
4436                let present = task_mut.subtasks.remove(&me);
4437                assert!(present);
4438
4439                for subtask in &self.subtasks {
4440                    task_mut.subtasks.insert(*subtask);
4441                }
4442
4443                for subtask in &self.subtasks {
4444                    state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4445                }
4446            }
4447            Caller::Host {
4448                exit_tx, caller, ..
4449            } => {
4450                for subtask in &self.subtasks {
4451                    state.get_mut(*subtask)?.caller = Caller::Host {
4452                        tx: None,
4453                        // Clone `exit_tx` to ensure that it is only dropped
4454                        // once all transitive subtasks of the host call have
4455                        // exited:
4456                        exit_tx: exit_tx.clone(),
4457                        host_future_present: false,
4458                        caller: *caller,
4459                    };
4460                }
4461            }
4462        }
4463
4464        for subtask in self.subtasks {
4465            let task = state.get_mut(subtask)?;
4466            if task.exited && task.ready_to_delete() {
4467                Waitable::Guest(subtask).delete_from(state)?;
4468            }
4469        }
4470
4471        Ok(())
4472    }
4473}
4474
4475impl TableDebug for GuestTask {
4476    fn type_name() -> &'static str {
4477        "GuestTask"
4478    }
4479}
4480
4481/// Represents state common to all kinds of waitables.
4482#[derive(Default)]
4483struct WaitableCommon {
4484    /// The currently pending event for this waitable, if any.
4485    event: Option<Event>,
4486    /// The set to which this waitable belongs, if any.
4487    set: Option<TableId<WaitableSet>>,
4488    /// The handle with which the guest refers to this waitable, if any.
4489    handle: Option<u32>,
4490}
4491
4492/// Represents a Component Model Async `waitable`.
4493#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4494enum Waitable {
4495    /// A host task
4496    Host(TableId<HostTask>),
4497    /// A guest task
4498    Guest(TableId<GuestTask>),
4499    /// The read or write end of a stream or future
4500    Transmit(TableId<TransmitHandle>),
4501}
4502
4503impl Waitable {
4504    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4505    /// handle.
4506    fn from_instance(
4507        state: Pin<&mut ComponentInstance>,
4508        caller_instance: RuntimeComponentInstanceIndex,
4509        waitable: u32,
4510    ) -> Result<Self> {
4511        use crate::runtime::vm::component::Waitable;
4512
4513        let (waitable, kind) = state.instance_states().0[caller_instance]
4514            .handle_table()
4515            .waitable_rep(waitable)?;
4516
4517        Ok(match kind {
4518            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4519            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4520            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4521        })
4522    }
4523
4524    /// Retrieve the host-visible identifier for this `Waitable`.
4525    fn rep(&self) -> u32 {
4526        match self {
4527            Self::Host(id) => id.rep(),
4528            Self::Guest(id) => id.rep(),
4529            Self::Transmit(id) => id.rep(),
4530        }
4531    }
4532
4533    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4534    /// remove it from any set it may currently belong to (when `set` is
4535    /// `None`).
4536    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4537        log::trace!("waitable {self:?} join set {set:?}",);
4538
4539        let old = mem::replace(&mut self.common(state)?.set, set);
4540
4541        if let Some(old) = old {
4542            match *self {
4543                Waitable::Host(id) => state.remove_child(id, old),
4544                Waitable::Guest(id) => state.remove_child(id, old),
4545                Waitable::Transmit(id) => state.remove_child(id, old),
4546            }?;
4547
4548            state.get_mut(old)?.ready.remove(self);
4549        }
4550
4551        if let Some(set) = set {
4552            match *self {
4553                Waitable::Host(id) => state.add_child(id, set),
4554                Waitable::Guest(id) => state.add_child(id, set),
4555                Waitable::Transmit(id) => state.add_child(id, set),
4556            }?;
4557
4558            if self.common(state)?.event.is_some() {
4559                self.mark_ready(state)?;
4560            }
4561        }
4562
4563        Ok(())
4564    }
4565
4566    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4567    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4568        Ok(match self {
4569            Self::Host(id) => &mut state.get_mut(*id)?.common,
4570            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4571            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4572        })
4573    }
4574
4575    /// Set or clear the pending event for this waitable and either deliver it
4576    /// to the first waiter, if any, or mark it as ready to be delivered to the
4577    /// next waiter that arrives.
4578    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4579        log::trace!("set event for {self:?}: {event:?}");
4580        self.common(state)?.event = event;
4581        self.mark_ready(state)
4582    }
4583
4584    /// Take the pending event from this waitable, leaving `None` in its place.
4585    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4586        let common = self.common(state)?;
4587        let event = common.event.take();
4588        if let Some(set) = self.common(state)?.set {
4589            state.get_mut(set)?.ready.remove(self);
4590        }
4591
4592        Ok(event)
4593    }
4594
4595    /// Deliver the current event for this waitable to the first waiter, if any,
4596    /// or else mark it as ready to be delivered to the next waiter that
4597    /// arrives.
4598    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4599        if let Some(set) = self.common(state)?.set {
4600            state.get_mut(set)?.ready.insert(*self);
4601            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4602                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4603                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4604
4605                let item = match mode {
4606                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4607                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4608                        thread,
4609                        kind: GuestCallKind::DeliverEvent {
4610                            instance,
4611                            set: Some(set),
4612                        },
4613                    }),
4614                };
4615                state.push_high_priority(item);
4616            }
4617        }
4618        Ok(())
4619    }
4620
4621    /// Remove this waitable from the instance's rep table.
4622    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4623        match self {
4624            Self::Host(task) => {
4625                log::trace!("delete host task {task:?}");
4626                state.delete(*task)?;
4627            }
4628            Self::Guest(task) => {
4629                log::trace!("delete guest task {task:?}");
4630                state.delete(*task)?.dispose(state, *task)?;
4631            }
4632            Self::Transmit(task) => {
4633                state.delete(*task)?;
4634            }
4635        }
4636
4637        Ok(())
4638    }
4639}
4640
4641impl fmt::Debug for Waitable {
4642    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4643        match self {
4644            Self::Host(id) => write!(f, "{id:?}"),
4645            Self::Guest(id) => write!(f, "{id:?}"),
4646            Self::Transmit(id) => write!(f, "{id:?}"),
4647        }
4648    }
4649}
4650
4651/// Represents a Component Model Async `waitable-set`.
4652#[derive(Default)]
4653struct WaitableSet {
4654    /// Which waitables in this set have pending events, if any.
4655    ready: BTreeSet<Waitable>,
4656    /// Which guest threads are currently waiting on this set, if any.
4657    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4658}
4659
4660impl TableDebug for WaitableSet {
4661    fn type_name() -> &'static str {
4662        "WaitableSet"
4663    }
4664}
4665
4666/// Type-erased closure to lower the parameters for a guest task.
4667type RawLower =
4668    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4669
4670/// Type-erased closure to lift the result for a guest task.
4671type RawLift = Box<
4672    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4673>;
4674
4675/// Type erased result of a guest task which may be downcast to the expected
4676/// type by a host caller (or simply ignored in the case of a guest caller; see
4677/// `DummyResult`).
4678type LiftedResult = Box<dyn Any + Send + Sync>;
4679
4680/// Used to return a result from a `LiftFn` when the actual result has already
4681/// been lowered to a guest task's stack and linear memory.
4682struct DummyResult;
4683
4684/// Represents the Component Model Async state of a (sub-)component instance.
4685#[derive(Default)]
4686pub struct ConcurrentInstanceState {
4687    /// Whether backpressure is set for this instance (enabled if >0)
4688    backpressure: u16,
4689    /// Whether this instance can be entered
4690    do_not_enter: bool,
4691    /// Pending calls for this instance which require `Self::backpressure` to be
4692    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4693    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4694}
4695
4696impl ConcurrentInstanceState {
4697    pub fn pending_is_empty(&self) -> bool {
4698        self.pending.is_empty()
4699    }
4700}
4701
4702#[derive(Debug, Copy, Clone)]
4703enum CurrentThread {
4704    Guest(QualifiedThreadId),
4705    Host(TableId<HostTask>),
4706    None,
4707}
4708
4709impl CurrentThread {
4710    fn guest(&self) -> Option<&QualifiedThreadId> {
4711        match self {
4712            Self::Guest(id) => Some(id),
4713            _ => None,
4714        }
4715    }
4716
4717    fn host(&self) -> Option<TableId<HostTask>> {
4718        match self {
4719            Self::Host(id) => Some(*id),
4720            _ => None,
4721        }
4722    }
4723
4724    fn is_none(&self) -> bool {
4725        matches!(self, Self::None)
4726    }
4727}
4728
4729impl From<QualifiedThreadId> for CurrentThread {
4730    fn from(id: QualifiedThreadId) -> Self {
4731        Self::Guest(id)
4732    }
4733}
4734
4735impl From<TableId<HostTask>> for CurrentThread {
4736    fn from(id: TableId<HostTask>) -> Self {
4737        Self::Host(id)
4738    }
4739}
4740
4741/// Represents the Component Model Async state of a store.
4742pub struct ConcurrentState {
4743    /// The currently running thread, if any.
4744    current_thread: CurrentThread,
4745
4746    /// The set of pending host and background tasks, if any.
4747    ///
4748    /// See `ComponentInstance::poll_until` for where we temporarily take this
4749    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4750    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4751    /// The table of waitables, waitable sets, etc.
4752    table: AlwaysMut<ResourceTable>,
4753    /// The "high priority" work queue for this store's event loop.
4754    high_priority: Vec<WorkItem>,
4755    /// The "low priority" work queue for this store's event loop.
4756    low_priority: VecDeque<WorkItem>,
4757    /// A place to stash the reason a fiber is suspending so that the code which
4758    /// resumed it will know under what conditions the fiber should be resumed
4759    /// again.
4760    suspend_reason: Option<SuspendReason>,
4761    /// A cached fiber which is waiting for work to do.
4762    ///
4763    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4764    worker: Option<StoreFiber<'static>>,
4765    /// A place to stash the work item for which we're resuming a worker fiber.
4766    worker_item: Option<WorkerItem>,
4767
4768    /// Reference counts for all component error contexts
4769    ///
4770    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4771    /// (sub)component ref counts as tracked by `error_context_tables`, for
4772    /// example when the host holds one or more references to error contexts.
4773    ///
4774    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4775    /// component-wide representation) of the index into concurrent state for a given
4776    /// stored `ErrorContext`.
4777    ///
4778    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4779    /// as a `TableId<ErrorContextState>`.
4780    global_error_context_ref_counts:
4781        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4782}
4783
4784impl Default for ConcurrentState {
4785    fn default() -> Self {
4786        Self {
4787            current_thread: CurrentThread::None,
4788            table: AlwaysMut::new(ResourceTable::new()),
4789            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4790            high_priority: Vec::new(),
4791            low_priority: VecDeque::new(),
4792            suspend_reason: None,
4793            worker: None,
4794            worker_item: None,
4795            global_error_context_ref_counts: BTreeMap::new(),
4796        }
4797    }
4798}
4799
4800impl ConcurrentState {
4801    /// Take ownership of any fibers and futures owned by this object.
4802    ///
4803    /// This should be used when disposing of the `Store` containing this object
4804    /// in order to gracefully resolve any and all fibers using
4805    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4806    /// use-after-free bugs due to fibers which may still have access to the
4807    /// `Store`.
4808    ///
4809    /// Additionally, the futures collected with this function should be dropped
4810    /// within a `tls::set` call, which will ensure than any futures closing
4811    /// over an `&Accessor` will have access to the store when dropped, allowing
4812    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4813    /// panicking.
4814    ///
4815    /// Note that this will leave the object in an inconsistent and unusable
4816    /// state, so it should only be used just prior to dropping it.
4817    pub(crate) fn take_fibers_and_futures(
4818        &mut self,
4819        fibers: &mut Vec<StoreFiber<'static>>,
4820        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4821    ) {
4822        for entry in self.table.get_mut().iter_mut() {
4823            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4824                for mode in mem::take(&mut set.waiting).into_values() {
4825                    if let WaitMode::Fiber(fiber) = mode {
4826                        fibers.push(fiber);
4827                    }
4828                }
4829            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4830                if let GuestThreadState::Suspended(fiber) =
4831                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4832                {
4833                    fibers.push(fiber);
4834                }
4835            }
4836        }
4837
4838        if let Some(fiber) = self.worker.take() {
4839            fibers.push(fiber);
4840        }
4841
4842        let mut handle_item = |item| match item {
4843            WorkItem::ResumeFiber(fiber) => {
4844                fibers.push(fiber);
4845            }
4846            WorkItem::PushFuture(future) => {
4847                self.futures
4848                    .get_mut()
4849                    .as_mut()
4850                    .unwrap()
4851                    .push(future.into_inner());
4852            }
4853            _ => {}
4854        };
4855
4856        for item in mem::take(&mut self.high_priority) {
4857            handle_item(item);
4858        }
4859        for item in mem::take(&mut self.low_priority) {
4860            handle_item(item);
4861        }
4862
4863        if let Some(them) = self.futures.get_mut().take() {
4864            futures.push(them);
4865        }
4866    }
4867
4868    /// Collect the next set of work items to run. This will be either all
4869    /// high-priority items, or a single low-priority item if there are no
4870    /// high-priority items.
4871    fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4872        let mut ready = mem::take(&mut self.high_priority);
4873        if ready.is_empty() {
4874            if let Some(item) = self.low_priority.pop_back() {
4875                ready.push(item);
4876            }
4877        }
4878        ready
4879    }
4880
4881    fn push<V: Send + Sync + 'static>(
4882        &mut self,
4883        value: V,
4884    ) -> Result<TableId<V>, ResourceTableError> {
4885        self.table.get_mut().push(value).map(TableId::from)
4886    }
4887
4888    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4889        self.table.get_mut().get_mut(&Resource::from(id))
4890    }
4891
4892    pub fn add_child<T: 'static, U: 'static>(
4893        &mut self,
4894        child: TableId<T>,
4895        parent: TableId<U>,
4896    ) -> Result<(), ResourceTableError> {
4897        self.table
4898            .get_mut()
4899            .add_child(Resource::from(child), Resource::from(parent))
4900    }
4901
4902    pub fn remove_child<T: 'static, U: 'static>(
4903        &mut self,
4904        child: TableId<T>,
4905        parent: TableId<U>,
4906    ) -> Result<(), ResourceTableError> {
4907        self.table
4908            .get_mut()
4909            .remove_child(Resource::from(child), Resource::from(parent))
4910    }
4911
4912    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4913        self.table.get_mut().delete(Resource::from(id))
4914    }
4915
4916    fn push_future(&mut self, future: HostTaskFuture) {
4917        // Note that we can't directly push to `ConcurrentState::futures` here
4918        // since this may be called from a future that's being polled inside
4919        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4920        // so it has exclusive access while polling it.  Therefore, we push a
4921        // work item to the "high priority" queue, which will actually push to
4922        // `ConcurrentState::futures` later.
4923        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4924    }
4925
4926    fn push_high_priority(&mut self, item: WorkItem) {
4927        log::trace!("push high priority: {item:?}");
4928        self.high_priority.push(item);
4929    }
4930
4931    fn push_low_priority(&mut self, item: WorkItem) {
4932        log::trace!("push low priority: {item:?}");
4933        self.low_priority.push_front(item);
4934    }
4935
4936    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4937        if high_priority {
4938            self.push_high_priority(item);
4939        } else {
4940            self.push_low_priority(item);
4941        }
4942    }
4943
4944    /// Implements the `context.get` intrinsic.
4945    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4946        let thread = self.unwrap_current_guest_thread();
4947        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4948        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4949        Ok(val)
4950    }
4951
4952    /// Implements the `context.set` intrinsic.
4953    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4954        let thread = self.unwrap_current_guest_thread();
4955        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4956        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4957        Ok(())
4958    }
4959
4960    /// Returns whether there's a pending cancellation on the current guest thread,
4961    /// consuming the event if so.
4962    fn take_pending_cancellation(&mut self) -> bool {
4963        let thread = self.unwrap_current_guest_thread();
4964        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4965            assert!(matches!(event, Event::Cancelled));
4966            true
4967        } else {
4968            false
4969        }
4970    }
4971
4972    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4973        if self.may_block(task) {
4974            Ok(())
4975        } else {
4976            Err(Trap::CannotBlockSyncTask.into())
4977        }
4978    }
4979
4980    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4981        let task = self.get_mut(task).unwrap();
4982        task.async_function || task.returned_or_cancelled()
4983    }
4984
4985    /// Used by `ResourceTables` to acquire the current `CallContext` for the
4986    /// specified task.
4987    ///
4988    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
4989    /// below.
4990    pub fn call_context(&mut self, task: u32) -> &mut CallContext {
4991        let (task, is_host) = (task >> 1, task & 1 == 1);
4992        if is_host {
4993            let task: TableId<HostTask> = TableId::new(task);
4994            &mut self.get_mut(task).unwrap().call_context
4995        } else {
4996            let task: TableId<GuestTask> = TableId::new(task);
4997            &mut self.get_mut(task).unwrap().call_context
4998        }
4999    }
5000
5001    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5002    /// in the future.
5003    pub fn current_call_context_scope_id(&self) -> u32 {
5004        let (bits, is_host) = match self.current_thread {
5005            CurrentThread::Guest(id) => (id.task.rep(), false),
5006            CurrentThread::Host(id) => (id.rep(), true),
5007            CurrentThread::None => unreachable!(),
5008        };
5009        assert_eq!((bits << 1) >> 1, bits);
5010        (bits << 1) | u32::from(is_host)
5011    }
5012
5013    fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5014        *self.current_thread.guest().unwrap()
5015    }
5016
5017    fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5018        self.current_thread.host().unwrap()
5019    }
5020}
5021
5022/// Provide a type hint to compiler about the shape of a parameter lower
5023/// closure.
5024fn for_any_lower<
5025    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5026>(
5027    fun: F,
5028) -> F {
5029    fun
5030}
5031
5032/// Provide a type hint to compiler about the shape of a result lift closure.
5033fn for_any_lift<
5034    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5035>(
5036    fun: F,
5037) -> F {
5038    fun
5039}
5040
5041/// Wrap the specified future in a `poll_fn` which asserts that the future is
5042/// only polled from the event loop of the specified `Store`.
5043///
5044/// See `StoreContextMut::run_concurrent` for details.
5045fn checked<F: Future + Send + 'static>(
5046    id: StoreId,
5047    fut: F,
5048) -> impl Future<Output = F::Output> + Send + 'static {
5049    async move {
5050        let mut fut = pin!(fut);
5051        future::poll_fn(move |cx| {
5052            let message = "\
5053                `Future`s which depend on asynchronous component tasks, streams, or \
5054                futures to complete may only be polled from the event loop of the \
5055                store to which they belong.  Please use \
5056                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5057            ";
5058            tls::try_get(|store| {
5059                let matched = match store {
5060                    tls::TryGet::Some(store) => store.id() == id,
5061                    tls::TryGet::Taken | tls::TryGet::None => false,
5062                };
5063
5064                if !matched {
5065                    panic!("{message}")
5066                }
5067            });
5068            fut.as_mut().poll(cx)
5069        })
5070        .await
5071    }
5072}
5073
5074/// Assert that `StoreContextMut::run_concurrent` has not been called from
5075/// within an store's event loop.
5076fn check_recursive_run() {
5077    tls::try_get(|store| {
5078        if !matches!(store, tls::TryGet::None) {
5079            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5080        }
5081    });
5082}
5083
5084fn unpack_callback_code(code: u32) -> (u32, u32) {
5085    (code & 0xF, code >> 4)
5086}
5087
5088/// Helper struct for packaging parameters to be passed to
5089/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5090/// `waitable-set.poll`.
5091struct WaitableCheckParams {
5092    set: TableId<WaitableSet>,
5093    options: OptionsIndex,
5094    payload: u32,
5095}
5096
5097/// Indicates whether `ComponentInstance::waitable_check` is being called for
5098/// `waitable-set.wait` or `waitable-set.poll`.
5099enum WaitableCheck {
5100    Wait,
5101    Poll,
5102}
5103
5104/// Represents a guest task called from the host, prepared using `prepare_call`.
5105pub(crate) struct PreparedCall<R> {
5106    /// The guest export to be called
5107    handle: Func,
5108    /// The guest thread created by `prepare_call`
5109    thread: QualifiedThreadId,
5110    /// The number of lowered core Wasm parameters to pass to the call.
5111    param_count: usize,
5112    /// The `oneshot::Receiver` to which the result of the call will be
5113    /// delivered when it is available.
5114    rx: oneshot::Receiver<LiftedResult>,
5115    /// The `oneshot::Receiver` which will resolve when the task -- and any
5116    /// transitive subtasks -- have all exited.
5117    exit_rx: oneshot::Receiver<()>,
5118    _phantom: PhantomData<R>,
5119}
5120
5121impl<R> PreparedCall<R> {
5122    /// Get a copy of the `TaskId` for this `PreparedCall`.
5123    pub(crate) fn task_id(&self) -> TaskId {
5124        TaskId {
5125            task: self.thread.task,
5126        }
5127    }
5128}
5129
5130/// Represents a task created by `prepare_call`.
5131pub(crate) struct TaskId {
5132    task: TableId<GuestTask>,
5133}
5134
5135impl TaskId {
5136    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5137    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5138    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5139    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5140    /// and delete the task when all threads are done.
5141    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5142        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5143        if !task.already_lowered_parameters() {
5144            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5145        } else {
5146            task.host_future_state = HostFutureState::Dropped;
5147            if task.ready_to_delete() {
5148                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5149            }
5150        }
5151        Ok(())
5152    }
5153}
5154
5155/// Prepare a call to the specified exported Wasm function, providing functions
5156/// for lowering the parameters and lifting the result.
5157///
5158/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5159/// loop, use `queue_call`.
5160pub(crate) fn prepare_call<T, R>(
5161    mut store: StoreContextMut<T>,
5162    handle: Func,
5163    param_count: usize,
5164    host_future_present: bool,
5165    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5166    + Send
5167    + Sync
5168    + 'static,
5169    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5170    + Send
5171    + Sync
5172    + 'static,
5173) -> Result<PreparedCall<R>> {
5174    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5175
5176    let instance = handle.instance().id().get(store.0);
5177    let options = &instance.component().env_component().options[options];
5178    let ty = &instance.component().types()[ty];
5179    let async_function = ty.async_;
5180    let task_return_type = ty.results;
5181    let component_instance = raw_options.instance;
5182    let callback = options.callback.map(|i| instance.runtime_callback(i));
5183    let memory = options
5184        .memory()
5185        .map(|i| instance.runtime_memory(i))
5186        .map(SendSyncPtr::new);
5187    let string_encoding = options.string_encoding;
5188    let token = StoreToken::new(store.as_context_mut());
5189    let state = store.0.concurrent_state_mut();
5190
5191    let (tx, rx) = oneshot::channel();
5192    let (exit_tx, exit_rx) = oneshot::channel();
5193
5194    let instance = RuntimeInstance {
5195        instance: handle.instance().id().instance(),
5196        index: component_instance,
5197    };
5198    let caller = state.current_thread;
5199    let task = GuestTask::new(
5200        state,
5201        Box::new(for_any_lower(move |store, params| {
5202            lower_params(handle, token.as_context_mut(store), params)
5203        })),
5204        LiftResult {
5205            lift: Box::new(for_any_lift(move |store, result| {
5206                lift_result(handle, store, result)
5207            })),
5208            ty: task_return_type,
5209            memory,
5210            string_encoding,
5211        },
5212        Caller::Host {
5213            tx: Some(tx),
5214            exit_tx: Arc::new(exit_tx),
5215            host_future_present,
5216            caller,
5217        },
5218        callback.map(|callback| {
5219            let callback = SendSyncPtr::new(callback);
5220            let instance = handle.instance();
5221            Box::new(move |store: &mut dyn VMStore, event, handle| {
5222                let store = token.as_context_mut(store);
5223                // SAFETY: Per the contract of `prepare_call`, the callback
5224                // will remain valid at least as long is this task exists.
5225                unsafe { instance.call_callback(store, callback, event, handle) }
5226            }) as CallbackFn
5227        }),
5228        instance,
5229        async_function,
5230    )?;
5231
5232    let task = state.push(task)?;
5233    let thread = state.push(GuestThread::new_implicit(task))?;
5234    state.get_mut(task)?.threads.insert(thread);
5235
5236    if !store.0.may_enter(instance) {
5237        bail!(crate::Trap::CannotEnterComponent);
5238    }
5239
5240    Ok(PreparedCall {
5241        handle,
5242        thread: QualifiedThreadId { task, thread },
5243        param_count,
5244        rx,
5245        exit_rx,
5246        _phantom: PhantomData,
5247    })
5248}
5249
5250/// Queue a call previously prepared using `prepare_call` to be run as part of
5251/// the associated `ComponentInstance`'s event loop.
5252///
5253/// The returned future will resolve to the result once it is available, but
5254/// must only be polled via the instance's event loop. See
5255/// `StoreContextMut::run_concurrent` for details.
5256pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5257    mut store: StoreContextMut<T>,
5258    prepared: PreparedCall<R>,
5259) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5260    let PreparedCall {
5261        handle,
5262        thread,
5263        param_count,
5264        rx,
5265        exit_rx,
5266        ..
5267    } = prepared;
5268
5269    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5270
5271    Ok(checked(
5272        store.0.id(),
5273        rx.map(move |result| {
5274            result
5275                .map(|v| (*v.downcast().unwrap(), exit_rx))
5276                .map_err(crate::Error::from)
5277        }),
5278    ))
5279}
5280
5281/// Queue a call previously prepared using `prepare_call` to be run as part of
5282/// the associated `ComponentInstance`'s event loop.
5283fn queue_call0<T: 'static>(
5284    store: StoreContextMut<T>,
5285    handle: Func,
5286    guest_thread: QualifiedThreadId,
5287    param_count: usize,
5288) -> Result<()> {
5289    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5290    let is_concurrent = raw_options.async_;
5291    let callback = raw_options.callback;
5292    let instance = handle.instance();
5293    let callee = handle.lifted_core_func(store.0);
5294    let post_return = handle.post_return_core_func(store.0);
5295    let callback = callback.map(|i| {
5296        let instance = instance.id().get(store.0);
5297        SendSyncPtr::new(instance.runtime_callback(i))
5298    });
5299
5300    log::trace!("queueing call {guest_thread:?}");
5301
5302    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5303    // (with signatures appropriate for this call) and will remain valid as
5304    // long as this instance is valid.
5305    unsafe {
5306        instance.queue_call(
5307            store,
5308            guest_thread,
5309            SendSyncPtr::new(callee),
5310            param_count,
5311            1,
5312            is_concurrent,
5313            callback,
5314            post_return.map(SendSyncPtr::new),
5315        )
5316    }
5317}