Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64    bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::sync::Arc;
84use std::task::{Context, Poll, Waker};
85use std::vec::Vec;
86use table::{TableDebug, TableId};
87use wasmtime_environ::Trap;
88use wasmtime_environ::component::{
89    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
90    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
91    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
92    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
93    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
94};
95use wasmtime_environ::packed_option::ReservedValue;
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113/// Constant defined in the Component Model spec to indicate that the async
114/// intrinsic (e.g. `future.write`) has not yet completed.
115const BLOCKED: u32 = 0xffff_ffff;
116
117/// Corresponds to `CallState` in the upstream spec.
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120    Starting = 0,
121    Started = 1,
122    Returned = 2,
123    StartCancelled = 3,
124    ReturnCancelled = 4,
125}
126
127impl Status {
128    /// Packs this status and the optional `waitable` provided into a 32-bit
129    /// result that the canonical ABI requires.
130    ///
131    /// The low 4 bits are reserved for the status while the upper 28 bits are
132    /// the waitable, if present.
133    pub fn pack(self, waitable: Option<u32>) -> u32 {
134        assert!(matches!(self, Status::Returned) == waitable.is_none());
135        let waitable = waitable.unwrap_or(0);
136        assert!(waitable < (1 << 28));
137        (waitable << 4) | (self as u32)
138    }
139}
140
141/// Corresponds to `EventCode` in the Component Model spec, plus related payload
142/// data.
143#[derive(Clone, Copy, Debug)]
144enum Event {
145    None,
146    Cancelled,
147    Subtask {
148        status: Status,
149    },
150    StreamRead {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    StreamWrite {
155        code: ReturnCode,
156        pending: Option<(TypeStreamTableIndex, u32)>,
157    },
158    FutureRead {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162    FutureWrite {
163        code: ReturnCode,
164        pending: Option<(TypeFutureTableIndex, u32)>,
165    },
166}
167
168impl Event {
169    /// Lower this event to core Wasm integers for delivery to the guest.
170    ///
171    /// Note that the waitable handle, if any, is assumed to be lowered
172    /// separately.
173    fn parts(self) -> (u32, u32) {
174        const EVENT_NONE: u32 = 0;
175        const EVENT_SUBTASK: u32 = 1;
176        const EVENT_STREAM_READ: u32 = 2;
177        const EVENT_STREAM_WRITE: u32 = 3;
178        const EVENT_FUTURE_READ: u32 = 4;
179        const EVENT_FUTURE_WRITE: u32 = 5;
180        const EVENT_CANCELLED: u32 = 6;
181        match self {
182            Event::None => (EVENT_NONE, 0),
183            Event::Cancelled => (EVENT_CANCELLED, 0),
184            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189        }
190    }
191}
192
193/// Corresponds to `CallbackCode` in the spec.
194mod callback_code {
195    pub const EXIT: u32 = 0;
196    pub const YIELD: u32 = 1;
197    pub const WAIT: u32 = 2;
198}
199
200/// A flag indicating that the callee is an async-lowered export.
201///
202/// This may be passed to the `async-start` intrinsic from a fused adapter.
203const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205/// Provides access to either store data (via the `get` method) or the store
206/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207/// instance to which the current host task belongs.
208///
209/// See [`Accessor::with`] for details.
210pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211    store: StoreContextMut<'a, T>,
212    get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217    D: HasData + ?Sized,
218    T: 'static,
219{
220    /// Creates a new [`Access`] from its component parts.
221    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222        Self { store, get_data }
223    }
224
225    /// Get mutable access to the store data.
226    pub fn data_mut(&mut self) -> &mut T {
227        self.store.data_mut()
228    }
229
230    /// Get mutable access to the store data.
231    pub fn get(&mut self) -> D::Data<'_> {
232        (self.get_data)(self.data_mut())
233    }
234
235    /// Spawn a background task.
236    ///
237    /// See [`Accessor::spawn`] for details.
238    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239    where
240        T: 'static,
241    {
242        let accessor = Accessor {
243            get_data: self.get_data,
244            token: StoreToken::new(self.store.as_context_mut()),
245        };
246        self.store
247            .as_context_mut()
248            .spawn_with_accessor(accessor, task)
249    }
250
251    /// Returns the getter this accessor is using to project from `T` into
252    /// `D::Data`.
253    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254        self.get_data
255    }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260    D: HasData + ?Sized,
261    T: 'static,
262{
263    type Data = T;
264
265    fn as_context(&self) -> StoreContext<'_, T> {
266        self.store.as_context()
267    }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272    D: HasData + ?Sized,
273    T: 'static,
274{
275    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276        self.store.as_context_mut()
277    }
278}
279
280/// Provides scoped mutable access to store data in the context of a concurrent
281/// host task future.
282///
283/// This allows multiple host task futures to execute concurrently and access
284/// the store between (but not across) `await` points.
285///
286/// # Rationale
287///
288/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289/// `D::Data<'_>`. The problem this is solving, however, is that it does not
290/// literally store these values. The basic problem is that when a concurrent
291/// host future is being polled it has access to `&mut T` (and the whole
292/// `Store`) but when it's not being polled it does not have access to these
293/// values. This reflects how the store is only ever polling one future at a
294/// time so the store is effectively being passed between futures.
295///
296/// Rust's `Future` trait, however, has no means of passing a `Store`
297/// temporarily between futures. The [`Context`](std::task::Context) type does
298/// not have the ability to attach arbitrary information to it at this time.
299/// This type, [`Accessor`], is used to bridge this expressivity gap.
300///
301/// The [`Accessor`] type here represents the ability to acquire, temporarily in
302/// a synchronous manner, the current store. The [`Accessor::with`] function
303/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305/// not take an `async` closure as its argument, instead it's a synchronous
306/// closure which must complete during on run of `Future::poll`. This reflects
307/// how the store is temporarily made available while a host future is being
308/// polled.
309///
310/// # Implementation
311///
312/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313/// this type additionally doesn't even have a lifetime parameter. This is
314/// instead a representation of proof of the ability to acquire these while a
315/// future is being polled. Wasmtime will, when it polls a host future,
316/// configure ambient state such that the `Accessor` that a future closes over
317/// will work and be able to access the store.
318///
319/// This has a number of implications for users such as:
320///
321/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322///   the lifetime of a single future.
323/// * A future is expected to, however, close over an `Accessor` and keep it
324///   alive probably for the duration of the entire future.
325/// * Different host futures will be given different `Accessor`s, and that's
326///   intentional.
327/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328///   alleviates some otherwise required bounds to be written down.
329///
330/// # Using `Accessor` in `Drop`
331///
332/// The methods on `Accessor` are only expected to work in the context of
333/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334/// host future can be dropped at any time throughout the system and Wasmtime
335/// store context is not necessarily available at that time. It's recommended to
336/// not use `Accessor` methods in anything connected to a `Drop` implementation
337/// as they will panic and have unintended results. If you run into this though
338/// feel free to file an issue on the Wasmtime repository.
339pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341    D: HasData + ?Sized,
342{
343    token: StoreToken<T>,
344    get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347/// A helper trait to take any type of accessor-with-data in functions.
348///
349/// This trait is similar to [`AsContextMut`] except that it's used when
350/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351/// [`Accessor`] is the main type used in concurrent settings and is passed to
352/// functions such as [`Func::call_concurrent`].
353///
354/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355/// this trait. This effectively means that regardless of the `D` in
356/// `Accessor<T, D>` it can still be passed to a function which just needs a
357/// store accessor.
358///
359/// Acquiring an [`Accessor`] can be done through
360/// [`StoreContextMut::run_concurrent`] for example or in a host function
361/// through
362/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
363pub trait AsAccessor {
364    /// The `T` in `Store<T>` that this accessor refers to.
365    type Data: 'static;
366
367    /// The `D` in `Accessor<T, D>`, or the projection out of
368    /// `Self::Data`.
369    type AccessorData: HasData + ?Sized;
370
371    /// Returns the accessor that this is referring to.
372    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376    type Data = T::Data;
377    type AccessorData = T::AccessorData;
378
379    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380        T::as_accessor(self)
381    }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385    type Data = T;
386    type AccessorData = D;
387
388    fn as_accessor(&self) -> &Accessor<T, D> {
389        self
390    }
391}
392
393// Note that it is intentional at this time that `Accessor` does not actually
394// store `&mut T` or anything similar. This distinctly enables the `Accessor`
395// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396// that matter). This is used to ergonomically simplify bindings where the
397// majority of the time `Accessor` is closed over in a future which then needs
398// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399// you already have to write `T: 'static`...) it helps to avoid this.
400//
401// Note as well that `Accessor` doesn't actually store its data at all. Instead
402// it's more of a "proof" of what can be accessed from TLS. API design around
403// `Accessor` and functions like `Linker::func_wrap_concurrent` are
404// intentionally made to ensure that `Accessor` is ideally only used in the
405// context that TLS variables are actually set. For example host functions are
406// given `&Accessor`, not `Accessor`, and this prevents them from persisting
407// the value outside of a future. Within the future the TLS variables are all
408// guaranteed to be set while the future is being polled.
409//
410// Finally though this is not an ironclad guarantee, but nor does it need to be.
411// The TLS APIs are designed to panic or otherwise model usage where they're
412// called recursively or similar. It's hoped that code cannot be constructed to
413// actually hit this at runtime but this is not a safety requirement at this
414// time.
415const _: () = {
416    const fn assert<T: Send + Sync>() {}
417    assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421    /// Creates a new `Accessor` backed by the specified functions.
422    ///
423    /// - `get`: used to retrieve the store
424    ///
425    /// - `get_data`: used to "project" from the store's associated data to
426    /// another type (e.g. a field of that data or a wrapper around it).
427    ///
428    /// - `spawn`: used to queue spawned background tasks to be run later
429    pub(crate) fn new(token: StoreToken<T>) -> Self {
430        Self {
431            token,
432            get_data: |x| x,
433        }
434    }
435}
436
437impl<T, D> Accessor<T, D>
438where
439    D: HasData + ?Sized,
440{
441    /// Run the specified closure, passing it mutable access to the store.
442    ///
443    /// This function is one of the main building blocks of the [`Accessor`]
444    /// type. This yields synchronous, blocking, access to the store via an
445    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446    /// providing the ability to access `D` via [`Access::get`]. Note that the
447    /// `fun` here is given only temporary access to the store and `T`/`D`
448    /// meaning that the return value `R` here is not allowed to capture borrows
449    /// into the two. If access is needed to data within `T` or `D` outside of
450    /// this closure then it must be `clone`d out, for example.
451    ///
452    /// # Panics
453    ///
454    /// This function will panic if it is call recursively with any other
455    /// accessor already in scope. For example if `with` is called within `fun`,
456    /// then this function will panic. It is up to the embedder to ensure that
457    /// this does not happen.
458    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459        tls::get(|vmstore| {
460            fun(Access {
461                store: self.token.as_context_mut(vmstore),
462                get_data: self.get_data,
463            })
464        })
465    }
466
467    /// Returns the getter this accessor is using to project from `T` into
468    /// `D::Data`.
469    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470        self.get_data
471    }
472
473    /// Changes this accessor to access `D2` instead of the current type
474    /// parameter `D`.
475    ///
476    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477    ///
478    /// # Panics
479    ///
480    /// When using this API the returned value is disconnected from `&self` and
481    /// the lifetime binding the `self` argument. An `Accessor` only works
482    /// within the context of the closure or async closure that it was
483    /// originally given to, however. This means that due to the fact that the
484    /// returned value has no lifetime connection it's possible to use the
485    /// accessor outside of `&self`, the original accessor, and panic.
486    ///
487    /// The returned value should only be used within the scope of the original
488    /// `Accessor` that `self` refers to.
489    pub fn with_getter<D2: HasData>(
490        &self,
491        get_data: fn(&mut T) -> D2::Data<'_>,
492    ) -> Accessor<T, D2> {
493        Accessor {
494            token: self.token,
495            get_data,
496        }
497    }
498
499    /// Spawn a background task which will receive an `&Accessor<T, D>` and
500    /// run concurrently with any other tasks in progress for the current
501    /// store.
502    ///
503    /// This is particularly useful for host functions which return a `stream`
504    /// or `future` such that the code to write to the write end of that
505    /// `stream` or `future` must run after the function returns.
506    ///
507    /// The returned [`JoinHandle`] may be used to cancel the task.
508    ///
509    /// # Panics
510    ///
511    /// Panics if called within a closure provided to the [`Accessor::with`]
512    /// function. This can only be called outside an active invocation of
513    /// [`Accessor::with`].
514    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515    where
516        T: 'static,
517    {
518        let accessor = self.clone_for_spawn();
519        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520    }
521
522    fn clone_for_spawn(&self) -> Self {
523        Self {
524            token: self.token,
525            get_data: self.get_data,
526        }
527    }
528}
529
530/// Represents a task which may be provided to `Accessor::spawn`,
531/// `Accessor::forward`, or `StorecContextMut::spawn`.
532// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
533// option.
534//
535// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
536// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
537// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
538// Send + Sync + 'static` fails with a type mismatch error when we try to pass
539// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
540// best we can do for the time being.
541pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543    D: HasData + ?Sized,
544{
545    /// Run the task.
546    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549/// Represents parameter and result metadata for the caller side of a
550/// guest->guest call orchestrated by a fused adapter.
551enum CallerInfo {
552    /// Metadata for a call to an async-lowered import
553    Async {
554        params: Vec<ValRaw>,
555        has_result: bool,
556    },
557    /// Metadata for a call to an sync-lowered import
558    Sync {
559        params: Vec<ValRaw>,
560        result_count: u32,
561    },
562}
563
564/// Indicates how a guest task is waiting on a waitable set.
565enum WaitMode {
566    /// The guest task is waiting using `task.wait`
567    Fiber(StoreFiber<'static>),
568    /// The guest task is waiting via a callback declared as part of an
569    /// async-lifted export.
570    Callback(Instance),
571}
572
573/// Represents the reason a fiber is suspending itself.
574#[derive(Debug)]
575enum SuspendReason {
576    /// The fiber is waiting for an event to be delivered to the specified
577    /// waitable set or task.
578    Waiting {
579        set: TableId<WaitableSet>,
580        thread: QualifiedThreadId,
581        skip_may_block_check: bool,
582    },
583    /// The fiber has finished handling its most recent work item and is waiting
584    /// for another (or to be dropped if it is no longer needed).
585    NeedWork,
586    /// The fiber is yielding and should be resumed once other tasks have had a
587    /// chance to run.
588    Yielding {
589        thread: QualifiedThreadId,
590        skip_may_block_check: bool,
591    },
592    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
593    ExplicitlySuspending {
594        thread: QualifiedThreadId,
595        skip_may_block_check: bool,
596    },
597}
598
599/// Represents a pending call into guest code for a given guest task.
600enum GuestCallKind {
601    /// Indicates there's an event to deliver to the task, possibly related to a
602    /// waitable set the task has been waiting on or polling.
603    DeliverEvent {
604        /// The instance to which the task belongs.
605        instance: Instance,
606        /// The waitable set the event belongs to, if any.
607        ///
608        /// If this is `None` the event will be waiting in the
609        /// `GuestTask::event` field for the task.
610        set: Option<TableId<WaitableSet>>,
611    },
612    /// Indicates that a new guest task call is pending and may be executed
613    /// using the specified closure.
614    ///
615    /// If the closure returns `Ok(Some(call))`, the `call` should be run
616    /// immediately using `handle_guest_call`.
617    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
618    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
619}
620
621impl fmt::Debug for GuestCallKind {
622    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623        match self {
624            Self::DeliverEvent { instance, set } => f
625                .debug_struct("DeliverEvent")
626                .field("instance", instance)
627                .field("set", set)
628                .finish(),
629            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
630            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
631        }
632    }
633}
634
635/// The target of a suspension intrinsic.
636#[derive(Copy, Clone, Debug)]
637pub enum SuspensionTarget {
638    SomeSuspended(u32),
639    Some(u32),
640    None,
641}
642
643impl SuspensionTarget {
644    fn is_none(&self) -> bool {
645        matches!(self, SuspensionTarget::None)
646    }
647    fn is_some(&self) -> bool {
648        !self.is_none()
649    }
650}
651
652/// Represents a pending call into guest code for a given guest thread.
653#[derive(Debug)]
654struct GuestCall {
655    thread: QualifiedThreadId,
656    kind: GuestCallKind,
657}
658
659impl GuestCall {
660    /// Returns whether or not the call is ready to run.
661    ///
662    /// A call will not be ready to run if either:
663    ///
664    /// - the (sub-)component instance to be called has already been entered and
665    /// cannot be reentered until an in-progress call completes
666    ///
667    /// - the call is for a not-yet started task and the (sub-)component
668    /// instance to be called has backpressure enabled
669    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
670        let instance = store
671            .concurrent_state_mut()
672            .get_mut(self.thread.task)?
673            .instance;
674        let state = store.instance_state(instance).concurrent_state();
675
676        let ready = match &self.kind {
677            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
678            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
679            GuestCallKind::StartExplicit(_) => true,
680        };
681        log::trace!(
682            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
683            state.do_not_enter,
684            state.backpressure
685        );
686        Ok(ready)
687    }
688}
689
690/// Job to be run on a worker fiber.
691enum WorkerItem {
692    GuestCall(GuestCall),
693    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
694}
695
696/// Represents a pending work item to be handled by the event loop for a given
697/// component instance.
698enum WorkItem {
699    /// A host task to be pushed to `ConcurrentState::futures`.
700    PushFuture(AlwaysMut<HostTaskFuture>),
701    /// A fiber to resume.
702    ResumeFiber(StoreFiber<'static>),
703    /// A thread to resume.
704    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
705    /// A pending call into guest code for a given guest task.
706    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
707    /// A job to run on a worker fiber.
708    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
709}
710
711impl fmt::Debug for WorkItem {
712    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
713        match self {
714            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
715            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
716            Self::ResumeThread(instance, thread) => f
717                .debug_tuple("ResumeThread")
718                .field(instance)
719                .field(thread)
720                .finish(),
721            Self::GuestCall(instance, call) => f
722                .debug_tuple("GuestCall")
723                .field(instance)
724                .field(call)
725                .finish(),
726            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
727        }
728    }
729}
730
731/// Whether a suspension intrinsic was cancelled or completed
732#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
733pub(crate) enum WaitResult {
734    Cancelled,
735    Completed,
736}
737
738/// Poll the specified future until it completes on behalf of a guest->host call
739/// using a sync-lowered import.
740///
741/// This is similar to `Instance::first_poll` except it's for sync-lowered
742/// imports, meaning we don't need to handle cancellation and we can block the
743/// caller until the task completes, at which point the caller can handle
744/// lowering the result to the guest's stack and linear memory.
745pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
746    store: &mut dyn VMStore,
747    future: impl Future<Output = Result<R>> + Send + 'static,
748) -> Result<R> {
749    let state = store.concurrent_state_mut();
750    let task = state.unwrap_current_host_thread();
751
752    // Wrap the future in a closure which will take care of stashing the result
753    // in `GuestTask::result` and resuming this fiber when the host task
754    // completes.
755    let mut future = Box::pin(async move {
756        let result = future.await?;
757        tls::get(move |store| {
758            let state = store.concurrent_state_mut();
759            state.get_mut(task)?.result = Some(Box::new(result) as _);
760
761            Waitable::Host(task).set_event(
762                state,
763                Some(Event::Subtask {
764                    status: Status::Returned,
765                }),
766            )?;
767
768            Ok(())
769        })
770    }) as HostTaskFuture;
771
772    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
773    // add the future to `ConcurrentState::futures` and poll it automatically
774    // from the event loop if it doesn't complete immediately here.
775    let poll = tls::set(store, || {
776        future
777            .as_mut()
778            .poll(&mut Context::from_waker(&Waker::noop()))
779    });
780
781    match poll {
782        // It completed immediately; check the result and delete the task.
783        Poll::Ready(result) => result?,
784
785        // It did not complete immediately; add it to
786        // `ConcurrentState::futures` so it will be polled via the event loop;
787        // then use `GuestTask::sync_call_set` to wait for the task to
788        // complete, suspending the current fiber until it does so.
789        Poll::Pending => {
790            let state = store.concurrent_state_mut();
791            state.push_future(future);
792
793            let caller = state.get_mut(task)?.caller;
794            let set = state.get_mut(caller.task)?.sync_call_set;
795            Waitable::Host(task).join(state, Some(set))?;
796
797            store.suspend(SuspendReason::Waiting {
798                set,
799                thread: caller,
800                skip_may_block_check: false,
801            })?;
802
803            // Remove the `task` from the `sync_call_set` to ensure that when
804            // this function returns and the task is deleted that there are no
805            // more lingering references to this host task.
806            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
807        }
808    }
809
810    // Retrieve and return the result.
811    Ok(*store
812        .concurrent_state_mut()
813        .get_mut(task)?
814        .result
815        .take()
816        .unwrap()
817        .downcast()
818        .unwrap())
819}
820
821/// Execute the specified guest call.
822fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
823    let mut next = Some(call);
824    while let Some(call) = next.take() {
825        match call.kind {
826            GuestCallKind::DeliverEvent { instance, set } => {
827                let (event, waitable) = instance
828                    .get_event(store, call.thread.task, set, true)?
829                    .unwrap();
830                let state = store.concurrent_state_mut();
831                let task = state.get_mut(call.thread.task)?;
832                let runtime_instance = task.instance;
833                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
834
835                log::trace!(
836                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
837                    call.thread,
838                );
839
840                let old_thread = store.set_thread(call.thread);
841                log::trace!(
842                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
843                    call.thread
844                );
845
846                store.enter_instance(runtime_instance);
847
848                let callback = store
849                    .concurrent_state_mut()
850                    .get_mut(call.thread.task)?
851                    .callback
852                    .take()
853                    .unwrap();
854
855                let code = callback(store, event, handle)?;
856
857                store
858                    .concurrent_state_mut()
859                    .get_mut(call.thread.task)?
860                    .callback = Some(callback);
861
862                store.exit_instance(runtime_instance)?;
863
864                store.set_thread(old_thread);
865
866                next = instance.handle_callback_code(
867                    store,
868                    call.thread,
869                    runtime_instance.index,
870                    code,
871                )?;
872
873                log::trace!(
874                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
875                );
876            }
877            GuestCallKind::StartImplicit(fun) => {
878                next = fun(store)?;
879            }
880            GuestCallKind::StartExplicit(fun) => {
881                fun(store)?;
882            }
883        }
884    }
885
886    Ok(())
887}
888
889impl<T> Store<T> {
890    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
891    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
892    where
893        T: Send + 'static,
894    {
895        ensure!(
896            self.as_context().0.concurrency_support(),
897            "cannot use `run_concurrent` when Config::concurrency_support disabled",
898        );
899        self.as_context_mut().run_concurrent(fun).await
900    }
901
902    #[doc(hidden)]
903    pub fn assert_concurrent_state_empty(&mut self) {
904        self.as_context_mut().assert_concurrent_state_empty();
905    }
906
907    #[doc(hidden)]
908    pub fn concurrent_state_table_size(&mut self) -> usize {
909        self.as_context_mut().concurrent_state_table_size()
910    }
911
912    /// Convenience wrapper for [`StoreContextMut::spawn`].
913    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
914    where
915        T: 'static,
916    {
917        self.as_context_mut().spawn(task)
918    }
919}
920
921impl<T> StoreContextMut<'_, T> {
922    /// Assert that all the relevant tables and queues in the concurrent state
923    /// for this store are empty.
924    ///
925    /// This is for sanity checking in integration tests
926    /// (e.g. `component-async-tests`) that the relevant state has been cleared
927    /// after each test concludes.  This should help us catch leaks, e.g. guest
928    /// tasks which haven't been deleted despite having completed and having
929    /// been dropped by their supertasks.
930    ///
931    /// Only intended for use in Wasmtime's own testing.
932    #[doc(hidden)]
933    pub fn assert_concurrent_state_empty(self) {
934        let store = self.0;
935        store
936            .store_data_mut()
937            .components
938            .assert_instance_states_empty();
939        let state = store.concurrent_state_mut();
940        assert!(
941            state.table.get_mut().is_empty(),
942            "non-empty table: {:?}",
943            state.table.get_mut()
944        );
945        assert!(state.high_priority.is_empty());
946        assert!(state.low_priority.is_empty());
947        assert!(state.current_thread.is_none());
948        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
949        assert!(state.global_error_context_ref_counts.is_empty());
950    }
951
952    /// Helper function to perform tests over the size of the concurrent state
953    /// table which can be useful for detecting leaks.
954    ///
955    /// Only intended for use in Wasmtime's own testing.
956    #[doc(hidden)]
957    pub fn concurrent_state_table_size(&mut self) -> usize {
958        self.0
959            .concurrent_state_mut()
960            .table
961            .get_mut()
962            .iter_mut()
963            .count()
964    }
965
966    /// Spawn a background task to run as part of this instance's event loop.
967    ///
968    /// The task will receive an `&Accessor<U>` and run concurrently with
969    /// any other tasks in progress for the instance.
970    ///
971    /// Note that the task will only make progress if and when the event loop
972    /// for this instance is run.
973    ///
974    /// The returned [`JoinHandle`] may be used to cancel the task.
975    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
976    where
977        T: 'static,
978    {
979        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
980        self.spawn_with_accessor(accessor, task)
981    }
982
983    /// Internal implementation of `spawn` functions where a `store` is
984    /// available along with an `Accessor`.
985    fn spawn_with_accessor<D>(
986        self,
987        accessor: Accessor<T, D>,
988        task: impl AccessorTask<T, D>,
989    ) -> JoinHandle
990    where
991        T: 'static,
992        D: HasData + ?Sized,
993    {
994        // Create an "abortable future" here where internally the future will
995        // hook calls to poll and possibly spawn more background tasks on each
996        // iteration.
997        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
998        self.0
999            .concurrent_state_mut()
1000            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1001        handle
1002    }
1003
1004    /// Run the specified closure `fun` to completion as part of this store's
1005    /// event loop.
1006    ///
1007    /// This will run `fun` as part of this store's event loop until it
1008    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1009    /// controlled access to the store and its data.
1010    ///
1011    /// This function can be used to invoke [`Func::call_concurrent`] for
1012    /// example within the async closure provided here.
1013    ///
1014    /// This function will unconditionally return an error if
1015    /// [`Config::concurrency_support`] is disabled.
1016    ///
1017    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1018    ///
1019    /// # Store-blocking behavior
1020    ///
1021    /// At this time there are certain situations in which the `Future` returned
1022    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1023    /// extended period of time, despite one or more `Waker::wake` events having
1024    /// occurred for the task to which it belongs.  This can manifest as the
1025    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1026    /// the `Store` being held by e.g. a blocking host function, preventing the
1027    /// `Future` from being polled. A canonical example of this is when the
1028    /// `fun` provided to this function attempts to set a timeout for an
1029    /// invocation of a wasm function. In this situation the async closure is
1030    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1031    /// to elapse. At this time this setup will not always work and the timeout
1032    /// may not reliably fire.
1033    ///
1034    /// This function will not block the current thread and as such is always
1035    /// suitable to run in an `async` context, but the current implementation of
1036    /// Wasmtime can lead to situations where a certain wasm computation is
1037    /// required to make progress the closure to make progress. This is an
1038    /// artifact of Wasmtime's historical implementation of `async` functions
1039    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1040    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1041    /// progress for a readiness notification of (b) to get delivered.
1042    ///
1043    /// This effectively means that it's not possible to reliably perform a
1044    /// "select" operation within the `fun` closure, which timeouts for example
1045    /// are based on. Fixing this requires some relatively major refactoring
1046    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1047    /// that is intended to be fixed one day. In the meantime it's recommended
1048    /// to apply timeouts or such to the entire `run_concurrent` call itself
1049    /// rather than internally.
1050    ///
1051    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1052    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1053    ///
1054    /// # Example
1055    ///
1056    /// ```
1057    /// # use {
1058    /// #   wasmtime::{
1059    /// #     error::{Result},
1060    /// #     component::{ Component, Linker, Resource, ResourceTable},
1061    /// #     Config, Engine, Store
1062    /// #   },
1063    /// # };
1064    /// #
1065    /// # struct MyResource(u32);
1066    /// # struct Ctx { table: ResourceTable }
1067    /// #
1068    /// # async fn foo() -> Result<()> {
1069    /// # let mut config = Config::new();
1070    /// # let engine = Engine::new(&config)?;
1071    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1072    /// # let mut linker = Linker::new(&engine);
1073    /// # let component = Component::new(&engine, "")?;
1074    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1075    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1076    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1077    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1078    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1079    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1080    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1081    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1082    ///    Ok(())
1083    /// }).await??;
1084    /// # Ok(())
1085    /// # }
1086    /// ```
1087    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1088    where
1089        T: Send + 'static,
1090    {
1091        ensure!(
1092            self.0.concurrency_support(),
1093            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1094        );
1095        self.do_run_concurrent(fun, false).await
1096    }
1097
1098    pub(super) async fn run_concurrent_trap_on_idle<R>(
1099        self,
1100        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1101    ) -> Result<R>
1102    where
1103        T: Send + 'static,
1104    {
1105        self.do_run_concurrent(fun, true).await
1106    }
1107
1108    async fn do_run_concurrent<R>(
1109        mut self,
1110        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1111        trap_on_idle: bool,
1112    ) -> Result<R>
1113    where
1114        T: Send + 'static,
1115    {
1116        debug_assert!(self.0.concurrency_support());
1117        check_recursive_run();
1118        let token = StoreToken::new(self.as_context_mut());
1119
1120        struct Dropper<'a, T: 'static, V> {
1121            store: StoreContextMut<'a, T>,
1122            value: ManuallyDrop<V>,
1123        }
1124
1125        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1126            fn drop(&mut self) {
1127                tls::set(self.store.0, || {
1128                    // SAFETY: Here we drop the value without moving it for the
1129                    // first and only time -- per the contract for `Drop::drop`,
1130                    // this code won't run again, and the `value` field will no
1131                    // longer be accessible.
1132                    unsafe { ManuallyDrop::drop(&mut self.value) }
1133                });
1134            }
1135        }
1136
1137        let accessor = &Accessor::new(token);
1138        let dropper = &mut Dropper {
1139            store: self,
1140            value: ManuallyDrop::new(fun(accessor)),
1141        };
1142        // SAFETY: We never move `dropper` nor its `value` field.
1143        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1144
1145        dropper
1146            .store
1147            .as_context_mut()
1148            .poll_until(future, trap_on_idle)
1149            .await
1150    }
1151
1152    /// Run this store's event loop.
1153    ///
1154    /// The returned future will resolve when the specified future completes or,
1155    /// if `trap_on_idle` is true, when the event loop can't make further
1156    /// progress.
1157    async fn poll_until<R>(
1158        mut self,
1159        mut future: Pin<&mut impl Future<Output = R>>,
1160        trap_on_idle: bool,
1161    ) -> Result<R>
1162    where
1163        T: Send + 'static,
1164    {
1165        struct Reset<'a, T: 'static> {
1166            store: StoreContextMut<'a, T>,
1167            futures: Option<FuturesUnordered<HostTaskFuture>>,
1168        }
1169
1170        impl<'a, T> Drop for Reset<'a, T> {
1171            fn drop(&mut self) {
1172                if let Some(futures) = self.futures.take() {
1173                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1174                }
1175            }
1176        }
1177
1178        loop {
1179            // Take `ConcurrentState::futures` out of the store so we can poll
1180            // it while also safely giving any of the futures inside access to
1181            // `self`.
1182            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1183            let mut reset = Reset {
1184                store: self.as_context_mut(),
1185                futures,
1186            };
1187            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1188
1189            enum PollResult<R> {
1190                Complete(R),
1191                ProcessWork(Vec<WorkItem>),
1192            }
1193            let result = future::poll_fn(|cx| {
1194                // First, poll the future we were passed as an argument and
1195                // return immediately if it's ready.
1196                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1197                    return Poll::Ready(Ok(PollResult::Complete(value)));
1198                }
1199
1200                // Next, poll `ConcurrentState::futures` (which includes any
1201                // pending host tasks and/or background tasks), returning
1202                // immediately if one of them fails.
1203                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1204                    Poll::Ready(Some(output)) => {
1205                        match output {
1206                            Err(e) => return Poll::Ready(Err(e)),
1207                            Ok(()) => {}
1208                        }
1209                        Poll::Ready(true)
1210                    }
1211                    Poll::Ready(None) => Poll::Ready(false),
1212                    Poll::Pending => Poll::Pending,
1213                };
1214
1215                // Next, collect the next batch of work items to process, if any.
1216                // This will be either all of the high-priority work items, or if
1217                // there are none, a single low-priority work item.
1218                let state = reset.store.0.concurrent_state_mut();
1219                let ready = state.collect_work_items_to_run();
1220                if !ready.is_empty() {
1221                    return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1222                }
1223
1224                // Finally, if we have nothing else to do right now, determine what to do
1225                // based on whether there are any pending futures in
1226                // `ConcurrentState::futures`.
1227                return match next {
1228                    Poll::Ready(true) => {
1229                        // In this case, one of the futures in
1230                        // `ConcurrentState::futures` completed
1231                        // successfully, so we return now and continue
1232                        // the outer loop in case there is another one
1233                        // ready to complete.
1234                        Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1235                    }
1236                    Poll::Ready(false) => {
1237                        // Poll the future we were passed one last time
1238                        // in case one of `ConcurrentState::futures` had
1239                        // the side effect of unblocking it.
1240                        if let Poll::Ready(value) =
1241                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1242                        {
1243                            Poll::Ready(Ok(PollResult::Complete(value)))
1244                        } else {
1245                            // In this case, there are no more pending
1246                            // futures in `ConcurrentState::futures`,
1247                            // there are no remaining work items, _and_
1248                            // the future we were passed as an argument
1249                            // still hasn't completed.
1250                            if trap_on_idle {
1251                                // `trap_on_idle` is true, so we exit
1252                                // immediately.
1253                                Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1254                            } else {
1255                                // `trap_on_idle` is false, so we assume
1256                                // that future will wake up and give us
1257                                // more work to do when it's ready to.
1258                                Poll::Pending
1259                            }
1260                        }
1261                    }
1262                    // There is at least one pending future in
1263                    // `ConcurrentState::futures` and we have nothing
1264                    // else to do but wait for now, so we return
1265                    // `Pending`.
1266                    Poll::Pending => Poll::Pending,
1267                };
1268            })
1269            .await;
1270
1271            // Put the `ConcurrentState::futures` back into the store before we
1272            // return or handle any work items since one or more of those items
1273            // might append more futures.
1274            drop(reset);
1275
1276            match result? {
1277                // The future we were passed as an argument completed, so we
1278                // return the result.
1279                PollResult::Complete(value) => break Ok(value),
1280                // The future we were passed has not yet completed, so handle
1281                // any work items and then loop again.
1282                PollResult::ProcessWork(ready) => {
1283                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1284                        store: StoreContextMut<'a, T>,
1285                        ready: I,
1286                    }
1287
1288                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1289                        fn drop(&mut self) {
1290                            while let Some(item) = self.ready.next() {
1291                                match item {
1292                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1293                                    WorkItem::PushFuture(future) => {
1294                                        tls::set(self.store.0, move || drop(future))
1295                                    }
1296                                    _ => {}
1297                                }
1298                            }
1299                        }
1300                    }
1301
1302                    let mut dispose = Dispose {
1303                        store: self.as_context_mut(),
1304                        ready: ready.into_iter(),
1305                    };
1306
1307                    while let Some(item) = dispose.ready.next() {
1308                        dispose
1309                            .store
1310                            .as_context_mut()
1311                            .handle_work_item(item)
1312                            .await?;
1313                    }
1314                }
1315            }
1316        }
1317    }
1318
1319    /// Handle the specified work item, possibly resuming a fiber if applicable.
1320    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1321    where
1322        T: Send,
1323    {
1324        log::trace!("handle work item {item:?}");
1325        match item {
1326            WorkItem::PushFuture(future) => {
1327                self.0
1328                    .concurrent_state_mut()
1329                    .futures
1330                    .get_mut()
1331                    .as_mut()
1332                    .unwrap()
1333                    .push(future.into_inner());
1334            }
1335            WorkItem::ResumeFiber(fiber) => {
1336                self.0.resume_fiber(fiber).await?;
1337            }
1338            WorkItem::ResumeThread(_, thread) => {
1339                if let GuestThreadState::Ready(fiber) = mem::replace(
1340                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1341                    GuestThreadState::Running,
1342                ) {
1343                    self.0.resume_fiber(fiber).await?;
1344                } else {
1345                    bail!("cannot resume non-pending thread {thread:?}");
1346                }
1347            }
1348            WorkItem::GuestCall(_, call) => {
1349                if call.is_ready(self.0)? {
1350                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1351                } else {
1352                    let state = self.0.concurrent_state_mut();
1353                    let task = state.get_mut(call.thread.task)?;
1354                    if !task.starting_sent {
1355                        task.starting_sent = true;
1356                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1357                            Waitable::Guest(call.thread.task).set_event(
1358                                state,
1359                                Some(Event::Subtask {
1360                                    status: Status::Starting,
1361                                }),
1362                            )?;
1363                        }
1364                    }
1365
1366                    let instance = state.get_mut(call.thread.task)?.instance;
1367                    self.0
1368                        .instance_state(instance)
1369                        .concurrent_state()
1370                        .pending
1371                        .insert(call.thread, call.kind);
1372                }
1373            }
1374            WorkItem::WorkerFunction(fun) => {
1375                self.run_on_worker(WorkerItem::Function(fun)).await?;
1376            }
1377        }
1378
1379        Ok(())
1380    }
1381
1382    /// Execute the specified guest call on a worker fiber.
1383    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1384    where
1385        T: Send,
1386    {
1387        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1388            fiber
1389        } else {
1390            fiber::make_fiber(self.0, move |store| {
1391                loop {
1392                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1393                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1394                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1395                    }
1396
1397                    store.suspend(SuspendReason::NeedWork)?;
1398                }
1399            })?
1400        };
1401
1402        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1403        assert!(worker_item.is_none());
1404        *worker_item = Some(item);
1405
1406        self.0.resume_fiber(worker).await
1407    }
1408
1409    /// Wrap the specified host function in a future which will call it, passing
1410    /// it an `&Accessor<T>`.
1411    ///
1412    /// See the `Accessor` documentation for details.
1413    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1414    where
1415        T: 'static,
1416        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1417            + Send
1418            + Sync
1419            + 'static,
1420        R: Send + Sync + 'static,
1421    {
1422        let token = StoreToken::new(self);
1423        async move {
1424            let mut accessor = Accessor::new(token);
1425            closure(&mut accessor).await
1426        }
1427    }
1428}
1429
1430impl StoreOpaque {
1431    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1432    /// guest-to-guest call or a sync host-to-guest call.
1433    ///
1434    /// This task will only be used for the purpose of handling calls to
1435    /// intrinsic functions; both parameter lowering and result lifting are
1436    /// assumed to be taken care of elsewhere.
1437    pub(crate) fn enter_guest_sync_call(
1438        &mut self,
1439        guest_caller: Option<RuntimeInstance>,
1440        callee_async: bool,
1441        callee: RuntimeInstance,
1442    ) -> Result<()> {
1443        log::trace!("enter sync call {callee:?}");
1444        if !self.concurrency_support() {
1445            return Ok(self.enter_call_not_concurrent());
1446        }
1447
1448        let state = self.concurrent_state_mut();
1449        let thread = state.current_thread;
1450        let instance = if let Some(thread) = thread.guest() {
1451            Some(state.get_mut(thread.task)?.instance)
1452        } else {
1453            None
1454        };
1455        let task = GuestTask::new(
1456            state,
1457            Box::new(move |_, _| unreachable!()),
1458            LiftResult {
1459                lift: Box::new(move |_, _| unreachable!()),
1460                ty: TypeTupleIndex::reserved_value(),
1461                memory: None,
1462                string_encoding: StringEncoding::Utf8,
1463            },
1464            if let Some(caller) = guest_caller {
1465                assert_eq!(caller, instance.unwrap());
1466                Caller::Guest {
1467                    thread: *thread.guest().unwrap(),
1468                }
1469            } else {
1470                Caller::Host {
1471                    tx: None,
1472                    exit_tx: Arc::new(oneshot::channel().0),
1473                    host_future_present: false,
1474                    caller: thread,
1475                }
1476            },
1477            None,
1478            callee,
1479            callee_async,
1480        )?;
1481
1482        let guest_task = state.push(task)?;
1483        let new_thread = GuestThread::new_implicit(guest_task);
1484        let guest_thread = state.push(new_thread)?;
1485        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1486            guest_thread,
1487            self,
1488            callee.index,
1489        )?;
1490
1491        let state = self.concurrent_state_mut();
1492        state.get_mut(guest_task)?.threads.insert(guest_thread);
1493        if guest_caller.is_some() {
1494            let thread = thread.guest().unwrap();
1495            state.get_mut(thread.task)?.subtasks.insert(guest_task);
1496        }
1497
1498        self.set_thread(QualifiedThreadId {
1499            task: guest_task,
1500            thread: guest_thread,
1501        });
1502
1503        Ok(())
1504    }
1505
1506    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1507    pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1508        if !self.concurrency_support() {
1509            return Ok(self.exit_call_not_concurrent());
1510        }
1511        let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1512        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1513        log::trace!("exit sync call {instance:?}");
1514        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1515            self,
1516            thread,
1517            instance.index,
1518        )?;
1519
1520        let state = self.concurrent_state_mut();
1521        let task = state.get_mut(thread.task)?;
1522        let caller = match &task.caller {
1523            &Caller::Guest { thread } => {
1524                assert!(guest_caller);
1525                thread.into()
1526            }
1527            &Caller::Host { caller, .. } => {
1528                assert!(!guest_caller);
1529                caller
1530            }
1531        };
1532        self.set_thread(caller);
1533
1534        let state = self.concurrent_state_mut();
1535        let task = state.get_mut(thread.task)?;
1536        if task.ready_to_delete() {
1537            state.delete(thread.task)?.dispose(state, thread.task)?;
1538        }
1539
1540        Ok(())
1541    }
1542
1543    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1544    /// transition to the host.
1545    ///
1546    /// FIXME: this is called for all guest->host transitions and performs some
1547    /// relatively expensive table manipulations. This would ideally be
1548    /// optimized to avoid the full allocation of a `HostTask` in at least some
1549    /// situations.
1550    pub fn enter_host_call(&mut self) -> Result<()> {
1551        let state = self.concurrent_state_mut();
1552        let caller = state.unwrap_current_guest_thread();
1553        let task = state.push(HostTask::new(caller))?;
1554        log::trace!("new host task {task:?}");
1555        self.set_thread(task);
1556        Ok(())
1557    }
1558
1559    /// Dual of `enter_host_call` and signifies that the host has finished and
1560    /// will be cleaned up.
1561    ///
1562    /// Note that this isn't invoked when the host is invoked asynchronously and
1563    /// the host isn't complete yet. In that situation the host task persists
1564    /// and will be cleaned up separately.
1565    pub fn exit_host_call(&mut self) -> Result<()> {
1566        let task = self.concurrent_state_mut().unwrap_current_host_thread();
1567        log::trace!("delete host task {task:?}");
1568        let task = self.concurrent_state_mut().delete(task)?;
1569        self.set_thread(task.caller);
1570        Ok(())
1571    }
1572
1573    /// Determine whether the specified instance may be entered from the host.
1574    ///
1575    /// We return `true` here only if all of the following hold:
1576    ///
1577    /// - The top-level instance is not already on the current task's call stack.
1578    /// - The instance is not in need of a post-return function call.
1579    /// - `self` has not been poisoned due to a trap.
1580    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1581        if self.trapped() {
1582            return false;
1583        }
1584        if !self.concurrency_support() {
1585            return true;
1586        }
1587        let state = self.concurrent_state_mut();
1588        let mut cur = state.current_thread;
1589        loop {
1590            match cur {
1591                CurrentThread::None => break true,
1592                CurrentThread::Guest(thread) => {
1593                    let task = state.get_mut(thread.task).unwrap();
1594
1595                    // Note that we only compare top-level instance IDs here.
1596                    // The idea is that the host is not allowed to recursively
1597                    // enter a top-level instance even if the specific leaf
1598                    // instance is not on the stack. This the behavior defined
1599                    // in the spec, and it allows us to elide runtime checks in
1600                    // guest-to-guest adapters.
1601                    if task.instance.instance == instance.instance {
1602                        break false;
1603                    }
1604                    cur = match task.caller {
1605                        Caller::Host { caller, .. } => caller,
1606                        Caller::Guest { thread } => thread.into(),
1607                    };
1608                }
1609                CurrentThread::Host(id) => {
1610                    cur = state.get_mut(id).unwrap().caller.into();
1611                }
1612            }
1613        }
1614    }
1615
1616    /// Helper function to retrieve the `InstanceState` for the
1617    /// specified instance.
1618    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1619        self.component_instance_mut(instance.instance)
1620            .instance_state(instance.index)
1621    }
1622
1623    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1624        // Each time we switch threads, we conservatively set `task_may_block`
1625        // to `false` for the component instance we're switching away from (if
1626        // any), meaning it will be `false` for any new thread created for that
1627        // instance unless explicitly set otherwise.
1628        let state = self.concurrent_state_mut();
1629        let old_thread = mem::replace(&mut state.current_thread, thread.into());
1630        if let Some(old_thread) = old_thread.guest() {
1631            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1632            self.component_instance_mut(instance)
1633                .set_task_may_block(false)
1634        }
1635
1636        // If we're switching to a new thread, set its component instance's
1637        // `task_may_block` according to where it left off.
1638        if self.concurrent_state_mut().current_thread.guest().is_some() {
1639            self.set_task_may_block();
1640        }
1641
1642        old_thread
1643    }
1644
1645    /// Set the global variable representing whether the current task may block
1646    /// prior to entering Wasm code.
1647    fn set_task_may_block(&mut self) {
1648        let state = self.concurrent_state_mut();
1649        let guest_thread = state.unwrap_current_guest_thread();
1650        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1651        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1652        self.component_instance_mut(instance)
1653            .set_task_may_block(may_block)
1654    }
1655
1656    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1657        if !self.concurrency_support() {
1658            return Ok(());
1659        }
1660        let state = self.concurrent_state_mut();
1661        let task = state.unwrap_current_guest_thread().task;
1662        let instance = state.get_mut(task).unwrap().instance.instance;
1663        let task_may_block = self.component_instance(instance).get_task_may_block();
1664
1665        if task_may_block {
1666            Ok(())
1667        } else {
1668            Err(Trap::CannotBlockSyncTask.into())
1669        }
1670    }
1671
1672    /// Record that we're about to enter a (sub-)component instance which does
1673    /// not support more than one concurrent, stackful activation, meaning it
1674    /// cannot be entered again until the next call returns.
1675    fn enter_instance(&mut self, instance: RuntimeInstance) {
1676        log::trace!("enter {instance:?}");
1677        self.instance_state(instance)
1678            .concurrent_state()
1679            .do_not_enter = true;
1680    }
1681
1682    /// Record that we've exited a (sub-)component instance previously entered
1683    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1684    /// See the documentation for the latter for details.
1685    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1686        log::trace!("exit {instance:?}");
1687        self.instance_state(instance)
1688            .concurrent_state()
1689            .do_not_enter = false;
1690        self.partition_pending(instance)
1691    }
1692
1693    /// Iterate over `InstanceState::pending`, moving any ready items into the
1694    /// "high priority" work item queue.
1695    ///
1696    /// See `GuestCall::is_ready` for details.
1697    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1698        for (thread, kind) in
1699            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1700        {
1701            let call = GuestCall { thread, kind };
1702            if call.is_ready(self)? {
1703                self.concurrent_state_mut()
1704                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1705            } else {
1706                self.instance_state(instance)
1707                    .concurrent_state()
1708                    .pending
1709                    .insert(call.thread, call.kind);
1710            }
1711        }
1712
1713        Ok(())
1714    }
1715
1716    /// Implements the `backpressure.{inc,dec}` intrinsics.
1717    pub(crate) fn backpressure_modify(
1718        &mut self,
1719        caller_instance: RuntimeInstance,
1720        modify: impl FnOnce(u16) -> Option<u16>,
1721    ) -> Result<()> {
1722        let state = self.instance_state(caller_instance).concurrent_state();
1723        let old = state.backpressure;
1724        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1725        state.backpressure = new;
1726
1727        if old > 0 && new == 0 {
1728            // Backpressure was previously enabled and is now disabled; move any
1729            // newly-eligible guest calls to the "high priority" queue.
1730            self.partition_pending(caller_instance)?;
1731        }
1732
1733        Ok(())
1734    }
1735
1736    /// Resume the specified fiber, giving it exclusive access to the specified
1737    /// store.
1738    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1739        let old_thread = self.concurrent_state_mut().current_thread;
1740        log::trace!("resume_fiber: save current thread {old_thread:?}");
1741
1742        let fiber = fiber::resolve_or_release(self, fiber).await?;
1743
1744        self.set_thread(old_thread);
1745
1746        let state = self.concurrent_state_mut();
1747
1748        if let Some(ot) = old_thread.guest() {
1749            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1750        }
1751        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1752
1753        if let Some(mut fiber) = fiber {
1754            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1755            // See the `SuspendReason` documentation for what each case means.
1756            match state.suspend_reason.take().unwrap() {
1757                SuspendReason::NeedWork => {
1758                    if state.worker.is_none() {
1759                        state.worker = Some(fiber);
1760                    } else {
1761                        fiber.dispose(self);
1762                    }
1763                }
1764                SuspendReason::Yielding { thread, .. } => {
1765                    state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1766                    let instance = state.get_mut(thread.task)?.instance.index;
1767                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1768                }
1769                SuspendReason::ExplicitlySuspending { thread, .. } => {
1770                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1771                }
1772                SuspendReason::Waiting { set, thread, .. } => {
1773                    let old = state
1774                        .get_mut(set)?
1775                        .waiting
1776                        .insert(thread, WaitMode::Fiber(fiber));
1777                    assert!(old.is_none());
1778                }
1779            };
1780        } else {
1781            log::trace!("resume_fiber: fiber has exited");
1782        }
1783
1784        Ok(())
1785    }
1786
1787    /// Suspend the current fiber, storing the reason in
1788    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1789    /// it should be resumed.
1790    ///
1791    /// See the `SuspendReason` documentation for details.
1792    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1793        log::trace!("suspend fiber: {reason:?}");
1794
1795        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1796        // pop the call context which manages resource borrows before suspending
1797        // and then push it again once we've resumed.
1798        let task = match &reason {
1799            SuspendReason::Yielding { thread, .. }
1800            | SuspendReason::Waiting { thread, .. }
1801            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1802            SuspendReason::NeedWork => None,
1803        };
1804
1805        let old_guest_thread = if task.is_some() {
1806            self.concurrent_state_mut().current_thread
1807        } else {
1808            CurrentThread::None
1809        };
1810
1811        // We should not have reached here unless either there's no current
1812        // task, or the current task is permitted to block.  In addition, we
1813        // special-case `thread.switch-to` and waiting for a subtask to go from
1814        // `starting` to `started`, both of which we consider non-blocking
1815        // operations despite requiring a suspend.
1816        assert!(
1817            matches!(
1818                reason,
1819                SuspendReason::ExplicitlySuspending {
1820                    skip_may_block_check: true,
1821                    ..
1822                } | SuspendReason::Waiting {
1823                    skip_may_block_check: true,
1824                    ..
1825                } | SuspendReason::Yielding {
1826                    skip_may_block_check: true,
1827                    ..
1828                }
1829            ) || old_guest_thread
1830                .guest()
1831                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1832                .unwrap_or(true)
1833        );
1834
1835        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1836        assert!(suspend_reason.is_none());
1837        *suspend_reason = Some(reason);
1838
1839        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1840
1841        if task.is_some() {
1842            self.set_thread(old_guest_thread);
1843        }
1844
1845        Ok(())
1846    }
1847
1848    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1849        let state = self.concurrent_state_mut();
1850        let caller = state.unwrap_current_guest_thread();
1851        let old_set = waitable.common(state)?.set;
1852        let set = state.get_mut(caller.task)?.sync_call_set;
1853        waitable.join(state, Some(set))?;
1854        self.suspend(SuspendReason::Waiting {
1855            set,
1856            thread: caller,
1857            skip_may_block_check: false,
1858        })?;
1859        let state = self.concurrent_state_mut();
1860        waitable.join(state, old_set)
1861    }
1862}
1863
1864impl Instance {
1865    /// Get the next pending event for the specified task and (optional)
1866    /// waitable set, along with the waitable handle if applicable.
1867    fn get_event(
1868        self,
1869        store: &mut StoreOpaque,
1870        guest_task: TableId<GuestTask>,
1871        set: Option<TableId<WaitableSet>>,
1872        cancellable: bool,
1873    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1874        let state = store.concurrent_state_mut();
1875
1876        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1877            log::trace!("deliver event {event:?} to {guest_task:?}");
1878
1879            if cancellable || !matches!(event, Event::Cancelled) {
1880                return Ok(Some((event, None)));
1881            } else {
1882                state.get_mut(guest_task)?.event = Some(event);
1883            }
1884        }
1885
1886        Ok(
1887            if let Some((set, waitable)) = set
1888                .and_then(|set| {
1889                    state
1890                        .get_mut(set)
1891                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1892                        .transpose()
1893                })
1894                .transpose()?
1895            {
1896                let common = waitable.common(state)?;
1897                let handle = common.handle.unwrap();
1898                let event = common.event.take().unwrap();
1899
1900                log::trace!(
1901                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1902                );
1903
1904                waitable.on_delivery(store, self, event);
1905
1906                Some((event, Some((waitable, handle))))
1907            } else {
1908                None
1909            },
1910        )
1911    }
1912
1913    /// Handle the `CallbackCode` returned from an async-lifted export or its
1914    /// callback.
1915    ///
1916    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1917    /// using `handle_guest_call`.
1918    fn handle_callback_code(
1919        self,
1920        store: &mut StoreOpaque,
1921        guest_thread: QualifiedThreadId,
1922        runtime_instance: RuntimeComponentInstanceIndex,
1923        code: u32,
1924    ) -> Result<Option<GuestCall>> {
1925        let (code, set) = unpack_callback_code(code);
1926
1927        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1928
1929        let state = store.concurrent_state_mut();
1930
1931        let get_set = |store: &mut StoreOpaque, handle| {
1932            if handle == 0 {
1933                bail!("invalid waitable-set handle");
1934            }
1935
1936            let set = store
1937                .instance_state(RuntimeInstance {
1938                    instance: self.id().instance(),
1939                    index: runtime_instance,
1940                })
1941                .handle_table()
1942                .waitable_set_rep(handle)?;
1943
1944            Ok(TableId::<WaitableSet>::new(set))
1945        };
1946
1947        Ok(match code {
1948            callback_code::EXIT => {
1949                log::trace!("implicit thread {guest_thread:?} completed");
1950                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1951                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1952                if task.threads.is_empty() && !task.returned_or_cancelled() {
1953                    bail!(Trap::NoAsyncResult);
1954                }
1955                match &task.caller {
1956                    Caller::Host { .. } => {
1957                        if task.ready_to_delete() {
1958                            Waitable::Guest(guest_thread.task)
1959                                .delete_from(store.concurrent_state_mut())?;
1960                        }
1961                    }
1962                    Caller::Guest { .. } => {
1963                        task.exited = true;
1964                        task.callback = None;
1965                    }
1966                }
1967                None
1968            }
1969            callback_code::YIELD => {
1970                let task = state.get_mut(guest_thread.task)?;
1971                // If an `Event::Cancelled` is pending, we'll deliver that;
1972                // otherwise, we'll deliver `Event::None`.  Note that
1973                // `GuestTask::event` is only ever set to one of those two
1974                // `Event` variants.
1975                if let Some(event) = task.event {
1976                    assert!(matches!(event, Event::None | Event::Cancelled));
1977                } else {
1978                    task.event = Some(Event::None);
1979                }
1980                let call = GuestCall {
1981                    thread: guest_thread,
1982                    kind: GuestCallKind::DeliverEvent {
1983                        instance: self,
1984                        set: None,
1985                    },
1986                };
1987                if state.may_block(guest_thread.task) {
1988                    // Push this thread onto the "low priority" queue so it runs
1989                    // after any other threads have had a chance to run.
1990                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1991                    None
1992                } else {
1993                    // Yielding in a non-blocking context is defined as a no-op
1994                    // according to the spec, so we must run this thread
1995                    // immediately without allowing any others to run.
1996                    Some(call)
1997                }
1998            }
1999            callback_code::WAIT => {
2000                // The task may only return `WAIT` if it was created for a call
2001                // to an async export).  Otherwise, we'll trap.
2002                state.check_blocking_for(guest_thread.task)?;
2003
2004                let set = get_set(store, set)?;
2005                let state = store.concurrent_state_mut();
2006
2007                if state.get_mut(guest_thread.task)?.event.is_some()
2008                    || !state.get_mut(set)?.ready.is_empty()
2009                {
2010                    // An event is immediately available; deliver it ASAP.
2011                    state.push_high_priority(WorkItem::GuestCall(
2012                        runtime_instance,
2013                        GuestCall {
2014                            thread: guest_thread,
2015                            kind: GuestCallKind::DeliverEvent {
2016                                instance: self,
2017                                set: Some(set),
2018                            },
2019                        },
2020                    ));
2021                } else {
2022                    // No event is immediately available.
2023                    //
2024                    // We're waiting, so register to be woken up when an event
2025                    // is published for this waitable set.
2026                    //
2027                    // Here we also set `GuestTask::wake_on_cancel` which allows
2028                    // `subtask.cancel` to interrupt the wait.
2029                    let old = state
2030                        .get_mut(guest_thread.thread)?
2031                        .wake_on_cancel
2032                        .replace(set);
2033                    assert!(old.is_none());
2034                    let old = state
2035                        .get_mut(set)?
2036                        .waiting
2037                        .insert(guest_thread, WaitMode::Callback(self));
2038                    assert!(old.is_none());
2039                }
2040                None
2041            }
2042            _ => bail!("unsupported callback code: {code}"),
2043        })
2044    }
2045
2046    fn cleanup_thread(
2047        self,
2048        store: &mut StoreOpaque,
2049        guest_thread: QualifiedThreadId,
2050        runtime_instance: RuntimeComponentInstanceIndex,
2051    ) -> Result<()> {
2052        let guest_id = store
2053            .concurrent_state_mut()
2054            .get_mut(guest_thread.thread)?
2055            .instance_rep;
2056        store
2057            .instance_state(RuntimeInstance {
2058                instance: self.id().instance(),
2059                index: runtime_instance,
2060            })
2061            .thread_handle_table()
2062            .guest_thread_remove(guest_id.unwrap())?;
2063
2064        store.concurrent_state_mut().delete(guest_thread.thread)?;
2065        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2066        task.threads.remove(&guest_thread.thread);
2067        Ok(())
2068    }
2069
2070    /// Add the specified guest call to the "high priority" work item queue, to
2071    /// be started as soon as backpressure and/or reentrance rules allow.
2072    ///
2073    /// SAFETY: The raw pointer arguments must be valid references to guest
2074    /// functions (with the appropriate signatures) when the closures queued by
2075    /// this function are called.
2076    unsafe fn queue_call<T: 'static>(
2077        self,
2078        mut store: StoreContextMut<T>,
2079        guest_thread: QualifiedThreadId,
2080        callee: SendSyncPtr<VMFuncRef>,
2081        param_count: usize,
2082        result_count: usize,
2083        async_: bool,
2084        callback: Option<SendSyncPtr<VMFuncRef>>,
2085        post_return: Option<SendSyncPtr<VMFuncRef>>,
2086    ) -> Result<()> {
2087        /// Return a closure which will call the specified function in the scope
2088        /// of the specified task.
2089        ///
2090        /// This will use `GuestTask::lower_params` to lower the parameters, but
2091        /// will not lift the result; instead, it returns a
2092        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2093        /// any, may be lifted.  Note that an async-lifted export will have
2094        /// returned its result using the `task.return` intrinsic (or not
2095        /// returned a result at all, in the case of `task.cancel`), in which
2096        /// case the "result" of this call will either be a callback code or
2097        /// nothing.
2098        ///
2099        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2100        /// the returned closure is called.
2101        unsafe fn make_call<T: 'static>(
2102            store: StoreContextMut<T>,
2103            guest_thread: QualifiedThreadId,
2104            callee: SendSyncPtr<VMFuncRef>,
2105            param_count: usize,
2106            result_count: usize,
2107        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2108        + Send
2109        + Sync
2110        + 'static
2111        + use<T> {
2112            let token = StoreToken::new(store);
2113            move |store: &mut dyn VMStore| {
2114                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2115
2116                store
2117                    .concurrent_state_mut()
2118                    .get_mut(guest_thread.thread)?
2119                    .state = GuestThreadState::Running;
2120                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2121                let lower = task.lower_params.take().unwrap();
2122
2123                lower(store, &mut storage[..param_count])?;
2124
2125                let mut store = token.as_context_mut(store);
2126
2127                // SAFETY: Per the contract documented in `make_call's`
2128                // documentation, `callee` must be a valid pointer.
2129                unsafe {
2130                    crate::Func::call_unchecked_raw(
2131                        &mut store,
2132                        callee.as_non_null(),
2133                        NonNull::new(
2134                            &mut storage[..param_count.max(result_count)]
2135                                as *mut [MaybeUninit<ValRaw>] as _,
2136                        )
2137                        .unwrap(),
2138                    )?;
2139                }
2140
2141                Ok(storage)
2142            }
2143        }
2144
2145        // SAFETY: Per the contract described in this function documentation,
2146        // the `callee` pointer which `call` closes over must be valid when
2147        // called by the closure we queue below.
2148        let call = unsafe {
2149            make_call(
2150                store.as_context_mut(),
2151                guest_thread,
2152                callee,
2153                param_count,
2154                result_count,
2155            )
2156        };
2157
2158        let callee_instance = store
2159            .0
2160            .concurrent_state_mut()
2161            .get_mut(guest_thread.task)?
2162            .instance;
2163
2164        let fun = if callback.is_some() {
2165            assert!(async_);
2166
2167            Box::new(move |store: &mut dyn VMStore| {
2168                self.add_guest_thread_to_instance_table(
2169                    guest_thread.thread,
2170                    store,
2171                    callee_instance.index,
2172                )?;
2173                let old_thread = store.set_thread(guest_thread);
2174                log::trace!(
2175                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2176                );
2177
2178                store.enter_instance(callee_instance);
2179
2180                // SAFETY: See the documentation for `make_call` to review the
2181                // contract we must uphold for `call` here.
2182                //
2183                // Per the contract described in the `queue_call`
2184                // documentation, the `callee` pointer which `call` closes
2185                // over must be valid.
2186                let storage = call(store)?;
2187
2188                store.exit_instance(callee_instance)?;
2189
2190                store.set_thread(old_thread);
2191                let state = store.concurrent_state_mut();
2192                old_thread
2193                    .guest()
2194                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2195                log::trace!("stackless call: restored {old_thread:?} as current thread");
2196
2197                // SAFETY: `wasmparser` will have validated that the callback
2198                // function returns a `i32` result.
2199                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2200
2201                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2202            })
2203                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2204        } else {
2205            let token = StoreToken::new(store.as_context_mut());
2206            Box::new(move |store: &mut dyn VMStore| {
2207                self.add_guest_thread_to_instance_table(
2208                    guest_thread.thread,
2209                    store,
2210                    callee_instance.index,
2211                )?;
2212                let old_thread = store.set_thread(guest_thread);
2213                log::trace!(
2214                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2215                );
2216                let flags = self.id().get(store).instance_flags(callee_instance.index);
2217
2218                // Unless this is a callback-less (i.e. stackful)
2219                // async-lifted export, we need to record that the instance
2220                // cannot be entered until the call returns.
2221                if !async_ {
2222                    store.enter_instance(callee_instance);
2223                }
2224
2225                // SAFETY: See the documentation for `make_call` to review the
2226                // contract we must uphold for `call` here.
2227                //
2228                // Per the contract described in the `queue_call`
2229                // documentation, the `callee` pointer which `call` closes
2230                // over must be valid.
2231                let storage = call(store)?;
2232
2233                if async_ {
2234                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2235                    if task.threads.len() == 1 && !task.returned_or_cancelled() {
2236                        bail!(Trap::NoAsyncResult);
2237                    }
2238                } else {
2239                    // This is a sync-lifted export, so now is when we lift the
2240                    // result, optionally call the post-return function, if any,
2241                    // and finally notify any current or future waiters that the
2242                    // subtask has returned.
2243
2244                    let lift = {
2245                        store.exit_instance(callee_instance)?;
2246
2247                        let state = store.concurrent_state_mut();
2248                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2249
2250                        state
2251                            .get_mut(guest_thread.task)?
2252                            .lift_result
2253                            .take()
2254                            .unwrap()
2255                    };
2256
2257                    // SAFETY: `result_count` represents the number of core Wasm
2258                    // results returned, per `wasmparser`.
2259                    let result = (lift.lift)(store, unsafe {
2260                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2261                            &storage[..result_count],
2262                        )
2263                    })?;
2264
2265                    let post_return_arg = match result_count {
2266                        0 => ValRaw::i32(0),
2267                        // SAFETY: `result_count` represents the number of
2268                        // core Wasm results returned, per `wasmparser`.
2269                        1 => unsafe { storage[0].assume_init() },
2270                        _ => unreachable!(),
2271                    };
2272
2273                    unsafe {
2274                        call_post_return(
2275                            token.as_context_mut(store),
2276                            post_return.map(|v| v.as_non_null()),
2277                            post_return_arg,
2278                            flags,
2279                        )?;
2280                    }
2281
2282                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2283                }
2284
2285                // This is a callback-less call, so the implicit thread has now completed
2286                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2287
2288                store.set_thread(old_thread);
2289
2290                let state = store.concurrent_state_mut();
2291                let task = state.get_mut(guest_thread.task)?;
2292
2293                match &task.caller {
2294                    Caller::Host { .. } => {
2295                        if task.ready_to_delete() {
2296                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2297                        }
2298                    }
2299                    Caller::Guest { .. } => {
2300                        task.exited = true;
2301                    }
2302                }
2303
2304                Ok(None)
2305            })
2306        };
2307
2308        store
2309            .0
2310            .concurrent_state_mut()
2311            .push_high_priority(WorkItem::GuestCall(
2312                callee_instance.index,
2313                GuestCall {
2314                    thread: guest_thread,
2315                    kind: GuestCallKind::StartImplicit(fun),
2316                },
2317            ));
2318
2319        Ok(())
2320    }
2321
2322    /// Prepare (but do not start) a guest->guest call.
2323    ///
2324    /// This is called from fused adapter code generated in
2325    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2326    /// are synthesized Wasm functions which move the parameters from the caller
2327    /// to the callee and the result from the callee to the caller,
2328    /// respectively.  The adapter will call `Self::start_call` immediately
2329    /// after calling this function.
2330    ///
2331    /// SAFETY: All the pointer arguments must be valid pointers to guest
2332    /// entities (and with the expected signatures for the function references
2333    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2334    unsafe fn prepare_call<T: 'static>(
2335        self,
2336        mut store: StoreContextMut<T>,
2337        start: *mut VMFuncRef,
2338        return_: *mut VMFuncRef,
2339        caller_instance: RuntimeComponentInstanceIndex,
2340        callee_instance: RuntimeComponentInstanceIndex,
2341        task_return_type: TypeTupleIndex,
2342        callee_async: bool,
2343        memory: *mut VMMemoryDefinition,
2344        string_encoding: u8,
2345        caller_info: CallerInfo,
2346    ) -> Result<()> {
2347        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2348            // A task may only call an async-typed function via a sync lower if
2349            // it was created by a call to an async export.  Otherwise, we'll
2350            // trap.
2351            store.0.check_blocking()?;
2352        }
2353
2354        enum ResultInfo {
2355            Heap { results: u32 },
2356            Stack { result_count: u32 },
2357        }
2358
2359        let result_info = match &caller_info {
2360            CallerInfo::Async {
2361                has_result: true,
2362                params,
2363            } => ResultInfo::Heap {
2364                results: params.last().unwrap().get_u32(),
2365            },
2366            CallerInfo::Async {
2367                has_result: false, ..
2368            } => ResultInfo::Stack { result_count: 0 },
2369            CallerInfo::Sync {
2370                result_count,
2371                params,
2372            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2373                results: params.last().unwrap().get_u32(),
2374            },
2375            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2376                result_count: *result_count,
2377            },
2378        };
2379
2380        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2381
2382        // Create a new guest task for the call, closing over the `start` and
2383        // `return_` functions to lift the parameters and lower the result,
2384        // respectively.
2385        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2386        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2387        let token = StoreToken::new(store.as_context_mut());
2388        let state = store.0.concurrent_state_mut();
2389        let old_thread = state.unwrap_current_guest_thread();
2390
2391        assert_eq!(
2392            state.get_mut(old_thread.task)?.instance,
2393            RuntimeInstance {
2394                instance: self.id().instance(),
2395                index: caller_instance,
2396            }
2397        );
2398
2399        let new_task = GuestTask::new(
2400            state,
2401            Box::new(move |store, dst| {
2402                let mut store = token.as_context_mut(store);
2403                assert!(dst.len() <= MAX_FLAT_PARAMS);
2404                // The `+ 1` here accounts for the return pointer, if any:
2405                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2406                let count = match caller_info {
2407                    // Async callers, if they have a result, use the last
2408                    // parameter as a return pointer so chop that off if
2409                    // relevant here.
2410                    CallerInfo::Async { params, has_result } => {
2411                        let params = &params[..params.len() - usize::from(has_result)];
2412                        for (param, src) in params.iter().zip(&mut src) {
2413                            src.write(*param);
2414                        }
2415                        params.len()
2416                    }
2417
2418                    // Sync callers forward everything directly.
2419                    CallerInfo::Sync { params, .. } => {
2420                        for (param, src) in params.iter().zip(&mut src) {
2421                            src.write(*param);
2422                        }
2423                        params.len()
2424                    }
2425                };
2426                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2427                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2428                // how it was constructed (see
2429                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2430                // for details) we know it takes count parameters and returns
2431                // `dst.len()` results.
2432                unsafe {
2433                    crate::Func::call_unchecked_raw(
2434                        &mut store,
2435                        start.as_non_null(),
2436                        NonNull::new(
2437                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2438                        )
2439                        .unwrap(),
2440                    )?;
2441                }
2442                dst.copy_from_slice(&src[..dst.len()]);
2443                let state = store.0.concurrent_state_mut();
2444                Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2445                    state,
2446                    Some(Event::Subtask {
2447                        status: Status::Started,
2448                    }),
2449                )?;
2450                Ok(())
2451            }),
2452            LiftResult {
2453                lift: Box::new(move |store, src| {
2454                    // SAFETY: See comment in closure passed as `lower_params`
2455                    // parameter above.
2456                    let mut store = token.as_context_mut(store);
2457                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2458                    if let ResultInfo::Heap { results } = &result_info {
2459                        my_src.push(ValRaw::u32(*results));
2460                    }
2461                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2462                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2463                    // on how it was constructed (see
2464                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2465                    // for details) we know it takes `src.len()` parameters and
2466                    // returns up to 1 result.
2467                    unsafe {
2468                        crate::Func::call_unchecked_raw(
2469                            &mut store,
2470                            return_.as_non_null(),
2471                            my_src.as_mut_slice().into(),
2472                        )?;
2473                    }
2474                    let state = store.0.concurrent_state_mut();
2475                    let thread = state.unwrap_current_guest_thread();
2476                    if sync_caller {
2477                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2478                            if let ResultInfo::Stack { result_count } = &result_info {
2479                                match result_count {
2480                                    0 => None,
2481                                    1 => Some(my_src[0]),
2482                                    _ => unreachable!(),
2483                                }
2484                            } else {
2485                                None
2486                            },
2487                        );
2488                    }
2489                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2490                }),
2491                ty: task_return_type,
2492                memory: NonNull::new(memory).map(SendSyncPtr::new),
2493                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2494            },
2495            Caller::Guest { thread: old_thread },
2496            None,
2497            RuntimeInstance {
2498                instance: self.id().instance(),
2499                index: callee_instance,
2500            },
2501            callee_async,
2502        )?;
2503
2504        let guest_task = state.push(new_task)?;
2505        let new_thread = GuestThread::new_implicit(guest_task);
2506        let guest_thread = state.push(new_thread)?;
2507        state.get_mut(guest_task)?.threads.insert(guest_thread);
2508
2509        store
2510            .0
2511            .concurrent_state_mut()
2512            .get_mut(old_thread.task)?
2513            .subtasks
2514            .insert(guest_task);
2515
2516        // Make the new thread the current one so that `Self::start_call` knows
2517        // which one to start.
2518        store.0.set_thread(QualifiedThreadId {
2519            task: guest_task,
2520            thread: guest_thread,
2521        });
2522        log::trace!(
2523            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2524        );
2525
2526        Ok(())
2527    }
2528
2529    /// Call the specified callback function for an async-lifted export.
2530    ///
2531    /// SAFETY: `function` must be a valid reference to a guest function of the
2532    /// correct signature for a callback.
2533    unsafe fn call_callback<T>(
2534        self,
2535        mut store: StoreContextMut<T>,
2536        function: SendSyncPtr<VMFuncRef>,
2537        event: Event,
2538        handle: u32,
2539    ) -> Result<u32> {
2540        let (ordinal, result) = event.parts();
2541        let params = &mut [
2542            ValRaw::u32(ordinal),
2543            ValRaw::u32(handle),
2544            ValRaw::u32(result),
2545        ];
2546        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2547        // `wasmtime-cranelift`-generated fused adapter code or
2548        // `component::Options`.  Per `wasmparser` callback signature
2549        // validation, we know it takes three parameters and returns one.
2550        unsafe {
2551            crate::Func::call_unchecked_raw(
2552                &mut store,
2553                function.as_non_null(),
2554                params.as_mut_slice().into(),
2555            )?;
2556        }
2557        Ok(params[0].get_u32())
2558    }
2559
2560    /// Start a guest->guest call previously prepared using
2561    /// `Self::prepare_call`.
2562    ///
2563    /// This is called from fused adapter code generated in
2564    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2565    /// this function immediately after calling `Self::prepare_call`.
2566    ///
2567    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2568    /// functions with the appropriate signatures for the current guest task.
2569    /// If this is a call to an async-lowered import, the actual call may be
2570    /// deferred and run after this function returns, in which case the pointer
2571    /// arguments must also be valid when the call happens.
2572    unsafe fn start_call<T: 'static>(
2573        self,
2574        mut store: StoreContextMut<T>,
2575        callback: *mut VMFuncRef,
2576        post_return: *mut VMFuncRef,
2577        callee: *mut VMFuncRef,
2578        param_count: u32,
2579        result_count: u32,
2580        flags: u32,
2581        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2582    ) -> Result<u32> {
2583        let token = StoreToken::new(store.as_context_mut());
2584        let async_caller = storage.is_none();
2585        let state = store.0.concurrent_state_mut();
2586        let guest_thread = state.unwrap_current_guest_thread();
2587        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2588        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2589        let param_count = usize::try_from(param_count).unwrap();
2590        assert!(param_count <= MAX_FLAT_PARAMS);
2591        let result_count = usize::try_from(result_count).unwrap();
2592        assert!(result_count <= MAX_FLAT_RESULTS);
2593
2594        let task = state.get_mut(guest_thread.task)?;
2595        if !callback.is_null() {
2596            // We're calling an async-lifted export with a callback, so store
2597            // the callback and related context as part of the task so we can
2598            // call it later when needed.
2599            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2600            task.callback = Some(Box::new(move |store, event, handle| {
2601                let store = token.as_context_mut(store);
2602                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2603            }));
2604        }
2605
2606        let Caller::Guest { thread: caller } = &task.caller else {
2607            // As of this writing, `start_call` is only used for guest->guest
2608            // calls.
2609            unreachable!()
2610        };
2611        let caller = *caller;
2612        let caller_instance = state.get_mut(caller.task)?.instance;
2613
2614        // Queue the call as a "high priority" work item.
2615        unsafe {
2616            self.queue_call(
2617                store.as_context_mut(),
2618                guest_thread,
2619                callee,
2620                param_count,
2621                result_count,
2622                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2623                NonNull::new(callback).map(SendSyncPtr::new),
2624                NonNull::new(post_return).map(SendSyncPtr::new),
2625            )?;
2626        }
2627
2628        let state = store.0.concurrent_state_mut();
2629
2630        // Use the caller's `GuestTask::sync_call_set` to register interest in
2631        // the subtask...
2632        let guest_waitable = Waitable::Guest(guest_thread.task);
2633        let old_set = guest_waitable.common(state)?.set;
2634        let set = state.get_mut(caller.task)?.sync_call_set;
2635        guest_waitable.join(state, Some(set))?;
2636
2637        // ... and suspend this fiber temporarily while we wait for it to start.
2638        //
2639        // Note that we _could_ call the callee directly using the current fiber
2640        // rather than suspend this one, but that would make reasoning about the
2641        // event loop more complicated and is probably only worth doing if
2642        // there's a measurable performance benefit.  In addition, it would mean
2643        // blocking the caller if the callee calls a blocking sync-lowered
2644        // import, and as of this writing the spec says we must not do that.
2645        //
2646        // Alternatively, the fused adapter code could be modified to call the
2647        // callee directly without calling a host-provided intrinsic at all (in
2648        // which case it would need to do its own, inline backpressure checks,
2649        // etc.).  Again, we'd want to see a measurable performance benefit
2650        // before committing to such an optimization.  And again, we'd need to
2651        // update the spec to allow that.
2652        let (status, waitable) = loop {
2653            store.0.suspend(SuspendReason::Waiting {
2654                set,
2655                thread: caller,
2656                // Normally, `StoreOpaque::suspend` would assert it's being
2657                // called from a context where blocking is allowed.  However, if
2658                // `async_caller` is `true`, we'll only "block" long enough for
2659                // the callee to start, i.e. we won't repeat this loop, so we
2660                // tell `suspend` it's okay even if we're not allowed to block.
2661                // Alternatively, if the callee is not an async function, then
2662                // we know it won't block anyway.
2663                skip_may_block_check: async_caller || !callee_async,
2664            })?;
2665
2666            let state = store.0.concurrent_state_mut();
2667
2668            log::trace!("taking event for {:?}", guest_thread.task);
2669            let event = guest_waitable.take_event(state)?;
2670            let Some(Event::Subtask { status }) = event else {
2671                unreachable!();
2672            };
2673
2674            log::trace!("status {status:?} for {:?}", guest_thread.task);
2675
2676            if status == Status::Returned {
2677                // It returned, so we can stop waiting.
2678                break (status, None);
2679            } else if async_caller {
2680                // It hasn't returned yet, but the caller is calling via an
2681                // async-lowered import, so we generate a handle for the task
2682                // waitable and return the status.
2683                let handle = store
2684                    .0
2685                    .instance_state(caller_instance)
2686                    .handle_table()
2687                    .subtask_insert_guest(guest_thread.task.rep())?;
2688                store
2689                    .0
2690                    .concurrent_state_mut()
2691                    .get_mut(guest_thread.task)?
2692                    .common
2693                    .handle = Some(handle);
2694                break (status, Some(handle));
2695            } else {
2696                // The callee hasn't returned yet, and the caller is calling via
2697                // a sync-lowered import, so we loop and keep waiting until the
2698                // callee returns.
2699            }
2700        };
2701
2702        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2703
2704        // Reset the current thread to point to the caller as it resumes control.
2705        store.0.set_thread(caller);
2706        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2707        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2708
2709        if let Some(storage) = storage {
2710            // The caller used a sync-lowered import to call an async-lifted
2711            // export, in which case the result, if any, has been stashed in
2712            // `GuestTask::sync_result`.
2713            let state = store.0.concurrent_state_mut();
2714            let task = state.get_mut(guest_thread.task)?;
2715            if let Some(result) = task.sync_result.take() {
2716                if let Some(result) = result {
2717                    storage[0] = MaybeUninit::new(result);
2718                }
2719
2720                if task.exited && task.ready_to_delete() {
2721                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2722                }
2723            }
2724        }
2725
2726        Ok(status.pack(waitable))
2727    }
2728
2729    /// Poll the specified future once on behalf of a guest->host call using an
2730    /// async-lowered import.
2731    ///
2732    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2733    /// `Pending`, add it to the set of futures to be polled as part of this
2734    /// instance's event loop until it completes, and then return
2735    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2736    ///
2737    /// Whether the future returns `Ready` immediately or later, the `lower`
2738    /// function will be used to lower the result, if any, into the guest caller's
2739    /// stack and linear memory unless the task has been cancelled.
2740    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2741        self,
2742        mut store: StoreContextMut<'_, T>,
2743        future: impl Future<Output = Result<R>> + Send + 'static,
2744        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2745    ) -> Result<Option<u32>> {
2746        let token = StoreToken::new(store.as_context_mut());
2747        let state = store.0.concurrent_state_mut();
2748        let task = state.unwrap_current_host_thread();
2749
2750        // Create an abortable future which hooks calls to poll and manages call
2751        // context state for the future.
2752        let (join_handle, future) = JoinHandle::run(future);
2753        {
2754            let task = state.get_mut(task)?;
2755            assert!(task.join_handle.is_none());
2756            task.join_handle = Some(join_handle);
2757        }
2758
2759        let mut future = Box::pin(future);
2760
2761        // Finally, poll the future.  We can use a dummy `Waker` here because
2762        // we'll add the future to `ConcurrentState::futures` and poll it
2763        // automatically from the event loop if it doesn't complete immediately
2764        // here.
2765        let poll = tls::set(store.0, || {
2766            future
2767                .as_mut()
2768                .poll(&mut Context::from_waker(&Waker::noop()))
2769        });
2770
2771        match poll {
2772            // It finished immediately; lower the result and delete the task.
2773            Poll::Ready(Some(result)) => {
2774                lower(store.as_context_mut(), result?)?;
2775                return Ok(None);
2776            }
2777
2778            // Shouldn't be possible since the future isn't cancelled via the
2779            // `join_handle`.
2780            Poll::Ready(None) => unreachable!(),
2781
2782            // Future isn't ready yet, so fall through.
2783            Poll::Pending => {}
2784        }
2785
2786        // It hasn't finished yet; add the future to
2787        // `ConcurrentState::futures` so it will be polled by the event
2788        // loop and allocate a waitable handle to return to the guest.
2789
2790        // Wrap the future in a closure responsible for lowering the result into
2791        // the guest's stack and memory, as well as notifying any waiters that
2792        // the task returned.
2793        let future = Box::pin(async move {
2794            let result = match future.await {
2795                Some(result) => result?,
2796                // Task was cancelled; nothing left to do.
2797                None => return Ok(()),
2798            };
2799            let on_complete = move |store: &mut dyn VMStore| {
2800                // Restore the `current_thread` to be the host so `lower` knows
2801                // how to manipulate borrows and knows which scope of borrows
2802                // to check.
2803                let mut store = token.as_context_mut(store);
2804                let state = store.0.concurrent_state_mut();
2805                assert!(state.current_thread.is_none());
2806                store.0.set_thread(task);
2807
2808                lower(store.as_context_mut(), result)?;
2809                let state = store.0.concurrent_state_mut();
2810                state.get_mut(task)?.join_handle.take();
2811                Waitable::Host(task).set_event(
2812                    state,
2813                    Some(Event::Subtask {
2814                        status: Status::Returned,
2815                    }),
2816                )?;
2817
2818                // Go back to "no current thread" at the end.
2819                store.0.set_thread(CurrentThread::None);
2820                Ok(())
2821            };
2822
2823            // Here we schedule a task to run on a worker fiber to do the
2824            // lowering since it may involve a call to the guest's realloc
2825            // function. This is necessary because calling the guest while
2826            // there are host embedder frames on the stack is unsound.
2827            tls::get(move |store| {
2828                store
2829                    .concurrent_state_mut()
2830                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2831                        on_complete,
2832                    ))));
2833                Ok(())
2834            })
2835        });
2836
2837        // Make this task visible to the guest and then record what it
2838        // was made visible as.
2839        let state = store.0.concurrent_state_mut();
2840        state.push_future(future);
2841        let caller = state.get_mut(task)?.caller;
2842        let instance = state.get_mut(caller.task)?.instance;
2843        let handle = store
2844            .0
2845            .instance_state(instance)
2846            .handle_table()
2847            .subtask_insert_host(task.rep())?;
2848        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2849        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2850
2851        // Restore the currently running thread to this host task's
2852        // caller. Note that the host task isn't deallocated as it's
2853        // within the store and will get deallocated later.
2854        store.0.set_thread(caller);
2855        Ok(Some(handle))
2856    }
2857
2858    /// Implements the `task.return` intrinsic, lifting the result for the
2859    /// current guest task.
2860    pub(crate) fn task_return(
2861        self,
2862        store: &mut dyn VMStore,
2863        ty: TypeTupleIndex,
2864        options: OptionsIndex,
2865        storage: &[ValRaw],
2866    ) -> Result<()> {
2867        let state = store.concurrent_state_mut();
2868        let guest_thread = state.unwrap_current_guest_thread();
2869        let lift = state
2870            .get_mut(guest_thread.task)?
2871            .lift_result
2872            .take()
2873            .ok_or_else(|| {
2874                format_err!("`task.return` or `task.cancel` called more than once for current task")
2875            })?;
2876        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2877
2878        let CanonicalOptions {
2879            string_encoding,
2880            data_model,
2881            ..
2882        } = &self.id().get(store).component().env_component().options[options];
2883
2884        let invalid = ty != lift.ty
2885            || string_encoding != &lift.string_encoding
2886            || match data_model {
2887                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2888                    Some(memory) => {
2889                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2890                        let actual = self.id().get(store).runtime_memory(memory);
2891                        expected != actual.as_ptr()
2892                    }
2893                    // Memory not specified, meaning it didn't need to be
2894                    // specified per validation, so not invalid.
2895                    None => false,
2896                },
2897                // Always invalid as this isn't supported.
2898                CanonicalOptionsDataModel::Gc { .. } => true,
2899            };
2900
2901        if invalid {
2902            bail!("invalid `task.return` signature and/or options for current task");
2903        }
2904
2905        log::trace!("task.return for {guest_thread:?}");
2906
2907        let result = (lift.lift)(store, storage)?;
2908        self.task_complete(store, guest_thread.task, result, Status::Returned)
2909    }
2910
2911    /// Implements the `task.cancel` intrinsic.
2912    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2913        let state = store.concurrent_state_mut();
2914        let guest_thread = state.unwrap_current_guest_thread();
2915        let task = state.get_mut(guest_thread.task)?;
2916        if !task.cancel_sent {
2917            bail!("`task.cancel` called by task which has not been cancelled")
2918        }
2919        _ = task.lift_result.take().ok_or_else(|| {
2920            format_err!("`task.return` or `task.cancel` called more than once for current task")
2921        })?;
2922
2923        assert!(task.result.is_none());
2924
2925        log::trace!("task.cancel for {guest_thread:?}");
2926
2927        self.task_complete(
2928            store,
2929            guest_thread.task,
2930            Box::new(DummyResult),
2931            Status::ReturnCancelled,
2932        )
2933    }
2934
2935    /// Complete the specified guest task (i.e. indicate that it has either
2936    /// returned a (possibly empty) result or cancelled itself).
2937    ///
2938    /// This will return any resource borrows and notify any current or future
2939    /// waiters that the task has completed.
2940    fn task_complete(
2941        self,
2942        store: &mut StoreOpaque,
2943        guest_task: TableId<GuestTask>,
2944        result: Box<dyn Any + Send + Sync>,
2945        status: Status,
2946    ) -> Result<()> {
2947        store
2948            .component_resource_tables(Some(self))
2949            .validate_scope_exit()?;
2950
2951        let state = store.concurrent_state_mut();
2952        let task = state.get_mut(guest_task)?;
2953
2954        if let Caller::Host { tx, .. } = &mut task.caller {
2955            if let Some(tx) = tx.take() {
2956                _ = tx.send(result);
2957            }
2958        } else {
2959            task.result = Some(result);
2960            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2961        }
2962
2963        Ok(())
2964    }
2965
2966    /// Implements the `waitable-set.new` intrinsic.
2967    pub(crate) fn waitable_set_new(
2968        self,
2969        store: &mut StoreOpaque,
2970        caller_instance: RuntimeComponentInstanceIndex,
2971    ) -> Result<u32> {
2972        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2973        let handle = store
2974            .instance_state(RuntimeInstance {
2975                instance: self.id().instance(),
2976                index: caller_instance,
2977            })
2978            .handle_table()
2979            .waitable_set_insert(set.rep())?;
2980        log::trace!("new waitable set {set:?} (handle {handle})");
2981        Ok(handle)
2982    }
2983
2984    /// Implements the `waitable-set.drop` intrinsic.
2985    pub(crate) fn waitable_set_drop(
2986        self,
2987        store: &mut StoreOpaque,
2988        caller_instance: RuntimeComponentInstanceIndex,
2989        set: u32,
2990    ) -> Result<()> {
2991        let rep = store
2992            .instance_state(RuntimeInstance {
2993                instance: self.id().instance(),
2994                index: caller_instance,
2995            })
2996            .handle_table()
2997            .waitable_set_remove(set)?;
2998
2999        log::trace!("drop waitable set {rep} (handle {set})");
3000
3001        let set = store
3002            .concurrent_state_mut()
3003            .delete(TableId::<WaitableSet>::new(rep))?;
3004
3005        if !set.waiting.is_empty() {
3006            bail!("cannot drop waitable set with waiters");
3007        }
3008
3009        Ok(())
3010    }
3011
3012    /// Implements the `waitable.join` intrinsic.
3013    pub(crate) fn waitable_join(
3014        self,
3015        store: &mut StoreOpaque,
3016        caller_instance: RuntimeComponentInstanceIndex,
3017        waitable_handle: u32,
3018        set_handle: u32,
3019    ) -> Result<()> {
3020        let mut instance = self.id().get_mut(store);
3021        let waitable =
3022            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3023
3024        let set = if set_handle == 0 {
3025            None
3026        } else {
3027            let set = instance.instance_states().0[caller_instance]
3028                .handle_table()
3029                .waitable_set_rep(set_handle)?;
3030
3031            Some(TableId::<WaitableSet>::new(set))
3032        };
3033
3034        log::trace!(
3035            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3036        );
3037
3038        waitable.join(store.concurrent_state_mut(), set)
3039    }
3040
3041    /// Implements the `subtask.drop` intrinsic.
3042    pub(crate) fn subtask_drop(
3043        self,
3044        store: &mut StoreOpaque,
3045        caller_instance: RuntimeComponentInstanceIndex,
3046        task_id: u32,
3047    ) -> Result<()> {
3048        self.waitable_join(store, caller_instance, task_id, 0)?;
3049
3050        let (rep, is_host) = store
3051            .instance_state(RuntimeInstance {
3052                instance: self.id().instance(),
3053                index: caller_instance,
3054            })
3055            .handle_table()
3056            .subtask_remove(task_id)?;
3057
3058        let concurrent_state = store.concurrent_state_mut();
3059        let (waitable, expected_caller, delete) = if is_host {
3060            let id = TableId::<HostTask>::new(rep);
3061            let task = concurrent_state.get_mut(id)?;
3062            if task.join_handle.is_some() {
3063                bail!("cannot drop a subtask which has not yet resolved");
3064            }
3065            (Waitable::Host(id), task.caller, true)
3066        } else {
3067            let id = TableId::<GuestTask>::new(rep);
3068            let task = concurrent_state.get_mut(id)?;
3069            if task.lift_result.is_some() {
3070                bail!("cannot drop a subtask which has not yet resolved");
3071            }
3072            if let Caller::Guest { thread } = task.caller {
3073                (
3074                    Waitable::Guest(id),
3075                    thread,
3076                    concurrent_state.get_mut(id)?.exited,
3077                )
3078            } else {
3079                unreachable!()
3080            }
3081        };
3082
3083        waitable.common(concurrent_state)?.handle = None;
3084
3085        if waitable.take_event(concurrent_state)?.is_some() {
3086            bail!("cannot drop a subtask with an undelivered event");
3087        }
3088
3089        if delete {
3090            waitable.delete_from(concurrent_state)?;
3091        }
3092
3093        // Since waitables can neither be passed between instances nor forged,
3094        // this should never fail unless there's a bug in Wasmtime, but we check
3095        // here to be sure:
3096        assert_eq!(
3097            expected_caller,
3098            concurrent_state.unwrap_current_guest_thread(),
3099        );
3100        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3101        Ok(())
3102    }
3103
3104    /// Implements the `waitable-set.wait` intrinsic.
3105    pub(crate) fn waitable_set_wait(
3106        self,
3107        store: &mut StoreOpaque,
3108        options: OptionsIndex,
3109        set: u32,
3110        payload: u32,
3111    ) -> Result<u32> {
3112        if !self.options(store, options).async_ {
3113            // The caller may only call `waitable-set.wait` from an async task
3114            // (i.e. a task created via a call to an async export).
3115            // Otherwise, we'll trap.
3116            store.check_blocking()?;
3117        }
3118
3119        let &CanonicalOptions {
3120            cancellable,
3121            instance: caller_instance,
3122            ..
3123        } = &self.id().get(store).component().env_component().options[options];
3124        let rep = store
3125            .instance_state(RuntimeInstance {
3126                instance: self.id().instance(),
3127                index: caller_instance,
3128            })
3129            .handle_table()
3130            .waitable_set_rep(set)?;
3131
3132        self.waitable_check(
3133            store,
3134            cancellable,
3135            WaitableCheck::Wait,
3136            WaitableCheckParams {
3137                set: TableId::new(rep),
3138                options,
3139                payload,
3140            },
3141        )
3142    }
3143
3144    /// Implements the `waitable-set.poll` intrinsic.
3145    pub(crate) fn waitable_set_poll(
3146        self,
3147        store: &mut StoreOpaque,
3148        options: OptionsIndex,
3149        set: u32,
3150        payload: u32,
3151    ) -> Result<u32> {
3152        let &CanonicalOptions {
3153            cancellable,
3154            instance: caller_instance,
3155            ..
3156        } = &self.id().get(store).component().env_component().options[options];
3157        let rep = store
3158            .instance_state(RuntimeInstance {
3159                instance: self.id().instance(),
3160                index: caller_instance,
3161            })
3162            .handle_table()
3163            .waitable_set_rep(set)?;
3164
3165        self.waitable_check(
3166            store,
3167            cancellable,
3168            WaitableCheck::Poll,
3169            WaitableCheckParams {
3170                set: TableId::new(rep),
3171                options,
3172                payload,
3173            },
3174        )
3175    }
3176
3177    /// Implements the `thread.index` intrinsic.
3178    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3179        let thread_id = store
3180            .concurrent_state_mut()
3181            .unwrap_current_guest_thread()
3182            .thread;
3183        // The unwrap is safe because `instance_rep` must be `Some` by this point
3184        Ok(store
3185            .concurrent_state_mut()
3186            .get_mut(thread_id)?
3187            .instance_rep
3188            .unwrap())
3189    }
3190
3191    /// Implements the `thread.new-indirect` intrinsic.
3192    pub(crate) fn thread_new_indirect<T: 'static>(
3193        self,
3194        mut store: StoreContextMut<T>,
3195        runtime_instance: RuntimeComponentInstanceIndex,
3196        _func_ty_idx: TypeFuncIndex, // currently unused
3197        start_func_table_idx: RuntimeTableIndex,
3198        start_func_idx: u32,
3199        context: i32,
3200    ) -> Result<u32> {
3201        log::trace!("creating new thread");
3202
3203        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3204        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3205        let callee = instance
3206            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3207            .ok_or_else(|| {
3208                format_err!("the start function index points to an uninitialized function")
3209            })?;
3210        if callee.type_index(store.0) != start_func_ty.type_index() {
3211            bail!(
3212                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3213            );
3214        }
3215
3216        let token = StoreToken::new(store.as_context_mut());
3217        let start_func = Box::new(
3218            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3219                let old_thread = store.set_thread(guest_thread);
3220                log::trace!(
3221                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3222                );
3223
3224                let mut store = token.as_context_mut(store);
3225                let mut params = [ValRaw::i32(context)];
3226                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3227                // on a separate fiber if we're running in an async store.
3228                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3229
3230                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3231                log::trace!("explicit thread {guest_thread:?} completed");
3232                let state = store.0.concurrent_state_mut();
3233                let task = state.get_mut(guest_thread.task)?;
3234                if task.threads.is_empty() && !task.returned_or_cancelled() {
3235                    bail!(Trap::NoAsyncResult);
3236                }
3237                store.0.set_thread(old_thread);
3238                let state = store.0.concurrent_state_mut();
3239                old_thread
3240                    .guest()
3241                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3242                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3243                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3244                }
3245                log::trace!("thread start: restored {old_thread:?} as current thread");
3246
3247                Ok(())
3248            },
3249        );
3250
3251        let state = store.0.concurrent_state_mut();
3252        let current_thread = state.unwrap_current_guest_thread();
3253        let parent_task = current_thread.task;
3254
3255        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3256        let thread_id = state.push(new_thread)?;
3257        state.get_mut(parent_task)?.threads.insert(thread_id);
3258
3259        log::trace!("new thread with id {thread_id:?} created");
3260
3261        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3262    }
3263
3264    pub(crate) fn resume_thread(
3265        self,
3266        store: &mut StoreOpaque,
3267        runtime_instance: RuntimeComponentInstanceIndex,
3268        thread_idx: u32,
3269        high_priority: bool,
3270        allow_ready: bool,
3271    ) -> Result<()> {
3272        let thread_id =
3273            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3274        let state = store.concurrent_state_mut();
3275        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3276        let thread = state.get_mut(guest_thread.thread)?;
3277
3278        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3279            GuestThreadState::NotStartedExplicit(start_func) => {
3280                log::trace!("starting thread {guest_thread:?}");
3281                let guest_call = WorkItem::GuestCall(
3282                    runtime_instance,
3283                    GuestCall {
3284                        thread: guest_thread,
3285                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3286                            start_func(store, guest_thread)
3287                        })),
3288                    },
3289                );
3290                store
3291                    .concurrent_state_mut()
3292                    .push_work_item(guest_call, high_priority);
3293            }
3294            GuestThreadState::Suspended(fiber) => {
3295                log::trace!("resuming thread {thread_id:?} that was suspended");
3296                store
3297                    .concurrent_state_mut()
3298                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3299            }
3300            GuestThreadState::Ready(fiber) if allow_ready => {
3301                log::trace!("resuming thread {thread_id:?} that was ready");
3302                thread.state = GuestThreadState::Ready(fiber);
3303                store
3304                    .concurrent_state_mut()
3305                    .promote_thread_work_item(guest_thread);
3306            }
3307            other => {
3308                thread.state = other;
3309                bail!("cannot resume thread which is not suspended");
3310            }
3311        }
3312        Ok(())
3313    }
3314
3315    fn add_guest_thread_to_instance_table(
3316        self,
3317        thread_id: TableId<GuestThread>,
3318        store: &mut StoreOpaque,
3319        runtime_instance: RuntimeComponentInstanceIndex,
3320    ) -> Result<u32> {
3321        let guest_id = store
3322            .instance_state(RuntimeInstance {
3323                instance: self.id().instance(),
3324                index: runtime_instance,
3325            })
3326            .thread_handle_table()
3327            .guest_thread_insert(thread_id.rep())?;
3328        store
3329            .concurrent_state_mut()
3330            .get_mut(thread_id)?
3331            .instance_rep = Some(guest_id);
3332        Ok(guest_id)
3333    }
3334
3335    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3336    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3337    pub(crate) fn suspension_intrinsic(
3338        self,
3339        store: &mut StoreOpaque,
3340        caller: RuntimeComponentInstanceIndex,
3341        cancellable: bool,
3342        yielding: bool,
3343        to_thread: SuspensionTarget,
3344    ) -> Result<WaitResult> {
3345        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3346        if to_thread.is_none() {
3347            let state = store.concurrent_state_mut();
3348            if yielding {
3349                // This is a `thread.yield` call
3350                if !state.may_block(guest_thread.task) {
3351                    // In a non-blocking context, a `thread.yield` may trigger
3352                    // other threads in the same component instance to run.
3353                    if !state.promote_instance_local_thread_work_item(caller) {
3354                        // No other threads are runnable, so just return
3355                        return Ok(WaitResult::Completed);
3356                    }
3357                }
3358            } else {
3359                // The caller may only call `thread.suspend` from an async task
3360                // (i.e. a task created via a call to an async export).
3361                // Otherwise, we'll trap.
3362                store.check_blocking()?;
3363            }
3364        }
3365
3366        // There could be a pending cancellation from a previous uncancellable wait
3367        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3368            return Ok(WaitResult::Cancelled);
3369        }
3370
3371        match to_thread {
3372            SuspensionTarget::SomeSuspended(thread) => {
3373                self.resume_thread(store, caller, thread, true, false)?
3374            }
3375            SuspensionTarget::Some(thread) => {
3376                self.resume_thread(store, caller, thread, true, true)?
3377            }
3378            SuspensionTarget::None => { /* nothing to do */ }
3379        }
3380
3381        let reason = if yielding {
3382            SuspendReason::Yielding {
3383                thread: guest_thread,
3384                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3385                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3386                // panic if we called it in a non-blocking context.
3387                skip_may_block_check: to_thread.is_some(),
3388            }
3389        } else {
3390            SuspendReason::ExplicitlySuspending {
3391                thread: guest_thread,
3392                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3393                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3394                // panic if we called it in a non-blocking context.
3395                skip_may_block_check: to_thread.is_some(),
3396            }
3397        };
3398
3399        store.suspend(reason)?;
3400
3401        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3402            Ok(WaitResult::Cancelled)
3403        } else {
3404            Ok(WaitResult::Completed)
3405        }
3406    }
3407
3408    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3409    fn waitable_check(
3410        self,
3411        store: &mut StoreOpaque,
3412        cancellable: bool,
3413        check: WaitableCheck,
3414        params: WaitableCheckParams,
3415    ) -> Result<u32> {
3416        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3417
3418        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3419
3420        let state = store.concurrent_state_mut();
3421        let task = state.get_mut(guest_thread.task)?;
3422
3423        // If we're waiting, and there are no events immediately available,
3424        // suspend the fiber until that changes.
3425        match &check {
3426            WaitableCheck::Wait => {
3427                let set = params.set;
3428
3429                if (task.event.is_none()
3430                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3431                    && state.get_mut(set)?.ready.is_empty()
3432                {
3433                    if cancellable {
3434                        let old = state
3435                            .get_mut(guest_thread.thread)?
3436                            .wake_on_cancel
3437                            .replace(set);
3438                        assert!(old.is_none());
3439                    }
3440
3441                    store.suspend(SuspendReason::Waiting {
3442                        set,
3443                        thread: guest_thread,
3444                        skip_may_block_check: false,
3445                    })?;
3446                }
3447            }
3448            WaitableCheck::Poll => {}
3449        }
3450
3451        log::trace!(
3452            "waitable check for {guest_thread:?}; set {:?}, part two",
3453            params.set
3454        );
3455
3456        // Deliver any pending events to the guest and return.
3457        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3458
3459        let (ordinal, handle, result) = match &check {
3460            WaitableCheck::Wait => {
3461                let (event, waitable) = event.unwrap();
3462                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463                let (ordinal, result) = event.parts();
3464                (ordinal, handle, result)
3465            }
3466            WaitableCheck::Poll => {
3467                if let Some((event, waitable)) = event {
3468                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3469                    let (ordinal, result) = event.parts();
3470                    (ordinal, handle, result)
3471                } else {
3472                    log::trace!(
3473                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3474                        guest_thread.task,
3475                        params.set
3476                    );
3477                    let (ordinal, result) = Event::None.parts();
3478                    (ordinal, 0, result)
3479                }
3480            }
3481        };
3482        let memory = self.options_memory_mut(store, params.options);
3483        let ptr = func::validate_inbounds_dynamic(
3484            &CanonicalAbiInfo::POINTER_PAIR,
3485            memory,
3486            &ValRaw::u32(params.payload),
3487        )?;
3488        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3489        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3490        Ok(ordinal)
3491    }
3492
3493    /// Implements the `subtask.cancel` intrinsic.
3494    pub(crate) fn subtask_cancel(
3495        self,
3496        store: &mut StoreOpaque,
3497        caller_instance: RuntimeComponentInstanceIndex,
3498        async_: bool,
3499        task_id: u32,
3500    ) -> Result<u32> {
3501        if !async_ {
3502            // The caller may only sync call `subtask.cancel` from an async task
3503            // (i.e. a task created via a call to an async export).  Otherwise,
3504            // we'll trap.
3505            store.check_blocking()?;
3506        }
3507
3508        let (rep, is_host) = store
3509            .instance_state(RuntimeInstance {
3510                instance: self.id().instance(),
3511                index: caller_instance,
3512            })
3513            .handle_table()
3514            .subtask_rep(task_id)?;
3515        let (waitable, expected_caller) = if is_host {
3516            let id = TableId::<HostTask>::new(rep);
3517            (
3518                Waitable::Host(id),
3519                store.concurrent_state_mut().get_mut(id)?.caller,
3520            )
3521        } else {
3522            let id = TableId::<GuestTask>::new(rep);
3523            if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3524                (Waitable::Guest(id), thread)
3525            } else {
3526                unreachable!()
3527            }
3528        };
3529        // Since waitables can neither be passed between instances nor forged,
3530        // this should never fail unless there's a bug in Wasmtime, but we check
3531        // here to be sure:
3532        let concurrent_state = store.concurrent_state_mut();
3533        assert_eq!(
3534            expected_caller,
3535            concurrent_state.unwrap_current_guest_thread(),
3536        );
3537
3538        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3539
3540        if let Waitable::Host(host_task) = waitable {
3541            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3542                handle.abort();
3543                return Ok(Status::ReturnCancelled as u32);
3544            }
3545        } else {
3546            let caller = concurrent_state.unwrap_current_guest_thread();
3547            let guest_task = TableId::<GuestTask>::new(rep);
3548            let task = concurrent_state.get_mut(guest_task)?;
3549            if !task.already_lowered_parameters() {
3550                // The task is in a `starting` state, meaning it hasn't run at
3551                // all yet.  Here we update its fields to indicate that it is
3552                // ready to delete immediately once `subtask.drop` is called.
3553                task.lower_params = None;
3554                task.lift_result = None;
3555                task.exited = true;
3556
3557                let instance = task.instance;
3558
3559                assert_eq!(1, task.threads.len());
3560                let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3561                let concurrent_state = store.concurrent_state_mut();
3562                concurrent_state.delete(thread)?;
3563                assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3564
3565                // Not yet started; cancel and remove from pending
3566                let pending = &mut store.instance_state(instance).concurrent_state().pending;
3567                let pending_count = pending.len();
3568                pending.retain(|thread, _| thread.task != guest_task);
3569                // If there were no pending threads for this task, we're in an error state
3570                if pending.len() == pending_count {
3571                    bail!("`subtask.cancel` called after terminal status delivered");
3572                }
3573                return Ok(Status::StartCancelled as u32);
3574            } else if !task.returned_or_cancelled() {
3575                // Started, but not yet returned or cancelled; send the
3576                // `CANCELLED` event
3577                task.cancel_sent = true;
3578                // Note that this might overwrite an event that was set earlier
3579                // (e.g. `Event::None` if the task is yielding, or
3580                // `Event::Cancelled` if it was already cancelled), but that's
3581                // okay -- this should supersede the previous state.
3582                task.event = Some(Event::Cancelled);
3583                let runtime_instance = task.instance.index;
3584                for thread in task.threads.clone() {
3585                    let thread = QualifiedThreadId {
3586                        task: guest_task,
3587                        thread,
3588                    };
3589                    if let Some(set) = concurrent_state
3590                        .get_mut(thread.thread)
3591                        .unwrap()
3592                        .wake_on_cancel
3593                        .take()
3594                    {
3595                        let item = match concurrent_state
3596                            .get_mut(set)?
3597                            .waiting
3598                            .remove(&thread)
3599                            .unwrap()
3600                        {
3601                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3602                            WaitMode::Callback(instance) => WorkItem::GuestCall(
3603                                runtime_instance,
3604                                GuestCall {
3605                                    thread,
3606                                    kind: GuestCallKind::DeliverEvent {
3607                                        instance,
3608                                        set: None,
3609                                    },
3610                                },
3611                            ),
3612                        };
3613                        concurrent_state.push_high_priority(item);
3614
3615                        store.suspend(SuspendReason::Yielding {
3616                            thread: caller,
3617                            // `subtask.cancel` is not allowed to be called in a
3618                            // sync context, so we cannot skip the may-block check.
3619                            skip_may_block_check: false,
3620                        })?;
3621                        break;
3622                    }
3623                }
3624
3625                let concurrent_state = store.concurrent_state_mut();
3626                let task = concurrent_state.get_mut(guest_task)?;
3627                if !task.returned_or_cancelled() {
3628                    if async_ {
3629                        return Ok(BLOCKED);
3630                    } else {
3631                        store.wait_for_event(Waitable::Guest(guest_task))?;
3632                    }
3633                }
3634            }
3635        }
3636
3637        let event = waitable.take_event(store.concurrent_state_mut())?;
3638        if let Some(Event::Subtask {
3639            status: status @ (Status::Returned | Status::ReturnCancelled),
3640        }) = event
3641        {
3642            Ok(status as u32)
3643        } else {
3644            bail!("`subtask.cancel` called after terminal status delivered");
3645        }
3646    }
3647
3648    pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3649        store.concurrent_state_mut().context_get(slot)
3650    }
3651
3652    pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3653        store.concurrent_state_mut().context_set(slot, value)
3654    }
3655}
3656
3657/// Trait representing component model ABI async intrinsics and fused adapter
3658/// helper functions.
3659///
3660/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3661/// which must be valid for at least the duration of the call (and possibly for
3662/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3663/// pointers used for async calls).
3664pub trait VMComponentAsyncStore {
3665    /// A helper function for fused adapter modules involving calls where the
3666    /// one of the caller or callee is async.
3667    ///
3668    /// This helper is not used when the caller and callee both use the sync
3669    /// ABI, only when at least one is async is this used.
3670    unsafe fn prepare_call(
3671        &mut self,
3672        instance: Instance,
3673        memory: *mut VMMemoryDefinition,
3674        start: *mut VMFuncRef,
3675        return_: *mut VMFuncRef,
3676        caller_instance: RuntimeComponentInstanceIndex,
3677        callee_instance: RuntimeComponentInstanceIndex,
3678        task_return_type: TypeTupleIndex,
3679        callee_async: bool,
3680        string_encoding: u8,
3681        result_count: u32,
3682        storage: *mut ValRaw,
3683        storage_len: usize,
3684    ) -> Result<()>;
3685
3686    /// A helper function for fused adapter modules involving calls where the
3687    /// caller is sync-lowered but the callee is async-lifted.
3688    unsafe fn sync_start(
3689        &mut self,
3690        instance: Instance,
3691        callback: *mut VMFuncRef,
3692        callee: *mut VMFuncRef,
3693        param_count: u32,
3694        storage: *mut MaybeUninit<ValRaw>,
3695        storage_len: usize,
3696    ) -> Result<()>;
3697
3698    /// A helper function for fused adapter modules involving calls where the
3699    /// caller is async-lowered.
3700    unsafe fn async_start(
3701        &mut self,
3702        instance: Instance,
3703        callback: *mut VMFuncRef,
3704        post_return: *mut VMFuncRef,
3705        callee: *mut VMFuncRef,
3706        param_count: u32,
3707        result_count: u32,
3708        flags: u32,
3709    ) -> Result<u32>;
3710
3711    /// The `future.write` intrinsic.
3712    fn future_write(
3713        &mut self,
3714        instance: Instance,
3715        caller: RuntimeComponentInstanceIndex,
3716        ty: TypeFutureTableIndex,
3717        options: OptionsIndex,
3718        future: u32,
3719        address: u32,
3720    ) -> Result<u32>;
3721
3722    /// The `future.read` intrinsic.
3723    fn future_read(
3724        &mut self,
3725        instance: Instance,
3726        caller: RuntimeComponentInstanceIndex,
3727        ty: TypeFutureTableIndex,
3728        options: OptionsIndex,
3729        future: u32,
3730        address: u32,
3731    ) -> Result<u32>;
3732
3733    /// The `future.drop-writable` intrinsic.
3734    fn future_drop_writable(
3735        &mut self,
3736        instance: Instance,
3737        ty: TypeFutureTableIndex,
3738        writer: u32,
3739    ) -> Result<()>;
3740
3741    /// The `stream.write` intrinsic.
3742    fn stream_write(
3743        &mut self,
3744        instance: Instance,
3745        caller: RuntimeComponentInstanceIndex,
3746        ty: TypeStreamTableIndex,
3747        options: OptionsIndex,
3748        stream: u32,
3749        address: u32,
3750        count: u32,
3751    ) -> Result<u32>;
3752
3753    /// The `stream.read` intrinsic.
3754    fn stream_read(
3755        &mut self,
3756        instance: Instance,
3757        caller: RuntimeComponentInstanceIndex,
3758        ty: TypeStreamTableIndex,
3759        options: OptionsIndex,
3760        stream: u32,
3761        address: u32,
3762        count: u32,
3763    ) -> Result<u32>;
3764
3765    /// The "fast-path" implementation of the `stream.write` intrinsic for
3766    /// "flat" (i.e. memcpy-able) payloads.
3767    fn flat_stream_write(
3768        &mut self,
3769        instance: Instance,
3770        caller: RuntimeComponentInstanceIndex,
3771        ty: TypeStreamTableIndex,
3772        options: OptionsIndex,
3773        payload_size: u32,
3774        payload_align: u32,
3775        stream: u32,
3776        address: u32,
3777        count: u32,
3778    ) -> Result<u32>;
3779
3780    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3781    /// (i.e. memcpy-able) payloads.
3782    fn flat_stream_read(
3783        &mut self,
3784        instance: Instance,
3785        caller: RuntimeComponentInstanceIndex,
3786        ty: TypeStreamTableIndex,
3787        options: OptionsIndex,
3788        payload_size: u32,
3789        payload_align: u32,
3790        stream: u32,
3791        address: u32,
3792        count: u32,
3793    ) -> Result<u32>;
3794
3795    /// The `stream.drop-writable` intrinsic.
3796    fn stream_drop_writable(
3797        &mut self,
3798        instance: Instance,
3799        ty: TypeStreamTableIndex,
3800        writer: u32,
3801    ) -> Result<()>;
3802
3803    /// The `error-context.debug-message` intrinsic.
3804    fn error_context_debug_message(
3805        &mut self,
3806        instance: Instance,
3807        ty: TypeComponentLocalErrorContextTableIndex,
3808        options: OptionsIndex,
3809        err_ctx_handle: u32,
3810        debug_msg_address: u32,
3811    ) -> Result<()>;
3812
3813    /// The `thread.new-indirect` intrinsic
3814    fn thread_new_indirect(
3815        &mut self,
3816        instance: Instance,
3817        caller: RuntimeComponentInstanceIndex,
3818        func_ty_idx: TypeFuncIndex,
3819        start_func_table_idx: RuntimeTableIndex,
3820        start_func_idx: u32,
3821        context: i32,
3822    ) -> Result<u32>;
3823}
3824
3825/// SAFETY: See trait docs.
3826impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3827    unsafe fn prepare_call(
3828        &mut self,
3829        instance: Instance,
3830        memory: *mut VMMemoryDefinition,
3831        start: *mut VMFuncRef,
3832        return_: *mut VMFuncRef,
3833        caller_instance: RuntimeComponentInstanceIndex,
3834        callee_instance: RuntimeComponentInstanceIndex,
3835        task_return_type: TypeTupleIndex,
3836        callee_async: bool,
3837        string_encoding: u8,
3838        result_count_or_max_if_async: u32,
3839        storage: *mut ValRaw,
3840        storage_len: usize,
3841    ) -> Result<()> {
3842        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3843        // this method will have ensured that `storage` is a valid
3844        // pointer containing at least `storage_len` items.
3845        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3846
3847        unsafe {
3848            instance.prepare_call(
3849                StoreContextMut(self),
3850                start,
3851                return_,
3852                caller_instance,
3853                callee_instance,
3854                task_return_type,
3855                callee_async,
3856                memory,
3857                string_encoding,
3858                match result_count_or_max_if_async {
3859                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3860                        params,
3861                        has_result: false,
3862                    },
3863                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3864                        params,
3865                        has_result: true,
3866                    },
3867                    result_count => CallerInfo::Sync {
3868                        params,
3869                        result_count,
3870                    },
3871                },
3872            )
3873        }
3874    }
3875
3876    unsafe fn sync_start(
3877        &mut self,
3878        instance: Instance,
3879        callback: *mut VMFuncRef,
3880        callee: *mut VMFuncRef,
3881        param_count: u32,
3882        storage: *mut MaybeUninit<ValRaw>,
3883        storage_len: usize,
3884    ) -> Result<()> {
3885        unsafe {
3886            instance
3887                .start_call(
3888                    StoreContextMut(self),
3889                    callback,
3890                    ptr::null_mut(),
3891                    callee,
3892                    param_count,
3893                    1,
3894                    START_FLAG_ASYNC_CALLEE,
3895                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3896                    // this method will have ensured that `storage` is a valid
3897                    // pointer containing at least `storage_len` items.
3898                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3899                )
3900                .map(drop)
3901        }
3902    }
3903
3904    unsafe fn async_start(
3905        &mut self,
3906        instance: Instance,
3907        callback: *mut VMFuncRef,
3908        post_return: *mut VMFuncRef,
3909        callee: *mut VMFuncRef,
3910        param_count: u32,
3911        result_count: u32,
3912        flags: u32,
3913    ) -> Result<u32> {
3914        unsafe {
3915            instance.start_call(
3916                StoreContextMut(self),
3917                callback,
3918                post_return,
3919                callee,
3920                param_count,
3921                result_count,
3922                flags,
3923                None,
3924            )
3925        }
3926    }
3927
3928    fn future_write(
3929        &mut self,
3930        instance: Instance,
3931        caller: RuntimeComponentInstanceIndex,
3932        ty: TypeFutureTableIndex,
3933        options: OptionsIndex,
3934        future: u32,
3935        address: u32,
3936    ) -> Result<u32> {
3937        instance
3938            .guest_write(
3939                StoreContextMut(self),
3940                caller,
3941                TransmitIndex::Future(ty),
3942                options,
3943                None,
3944                future,
3945                address,
3946                1,
3947            )
3948            .map(|result| result.encode())
3949    }
3950
3951    fn future_read(
3952        &mut self,
3953        instance: Instance,
3954        caller: RuntimeComponentInstanceIndex,
3955        ty: TypeFutureTableIndex,
3956        options: OptionsIndex,
3957        future: u32,
3958        address: u32,
3959    ) -> Result<u32> {
3960        instance
3961            .guest_read(
3962                StoreContextMut(self),
3963                caller,
3964                TransmitIndex::Future(ty),
3965                options,
3966                None,
3967                future,
3968                address,
3969                1,
3970            )
3971            .map(|result| result.encode())
3972    }
3973
3974    fn stream_write(
3975        &mut self,
3976        instance: Instance,
3977        caller: RuntimeComponentInstanceIndex,
3978        ty: TypeStreamTableIndex,
3979        options: OptionsIndex,
3980        stream: u32,
3981        address: u32,
3982        count: u32,
3983    ) -> Result<u32> {
3984        instance
3985            .guest_write(
3986                StoreContextMut(self),
3987                caller,
3988                TransmitIndex::Stream(ty),
3989                options,
3990                None,
3991                stream,
3992                address,
3993                count,
3994            )
3995            .map(|result| result.encode())
3996    }
3997
3998    fn stream_read(
3999        &mut self,
4000        instance: Instance,
4001        caller: RuntimeComponentInstanceIndex,
4002        ty: TypeStreamTableIndex,
4003        options: OptionsIndex,
4004        stream: u32,
4005        address: u32,
4006        count: u32,
4007    ) -> Result<u32> {
4008        instance
4009            .guest_read(
4010                StoreContextMut(self),
4011                caller,
4012                TransmitIndex::Stream(ty),
4013                options,
4014                None,
4015                stream,
4016                address,
4017                count,
4018            )
4019            .map(|result| result.encode())
4020    }
4021
4022    fn future_drop_writable(
4023        &mut self,
4024        instance: Instance,
4025        ty: TypeFutureTableIndex,
4026        writer: u32,
4027    ) -> Result<()> {
4028        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4029    }
4030
4031    fn flat_stream_write(
4032        &mut self,
4033        instance: Instance,
4034        caller: RuntimeComponentInstanceIndex,
4035        ty: TypeStreamTableIndex,
4036        options: OptionsIndex,
4037        payload_size: u32,
4038        payload_align: u32,
4039        stream: u32,
4040        address: u32,
4041        count: u32,
4042    ) -> Result<u32> {
4043        instance
4044            .guest_write(
4045                StoreContextMut(self),
4046                caller,
4047                TransmitIndex::Stream(ty),
4048                options,
4049                Some(FlatAbi {
4050                    size: payload_size,
4051                    align: payload_align,
4052                }),
4053                stream,
4054                address,
4055                count,
4056            )
4057            .map(|result| result.encode())
4058    }
4059
4060    fn flat_stream_read(
4061        &mut self,
4062        instance: Instance,
4063        caller: RuntimeComponentInstanceIndex,
4064        ty: TypeStreamTableIndex,
4065        options: OptionsIndex,
4066        payload_size: u32,
4067        payload_align: u32,
4068        stream: u32,
4069        address: u32,
4070        count: u32,
4071    ) -> Result<u32> {
4072        instance
4073            .guest_read(
4074                StoreContextMut(self),
4075                caller,
4076                TransmitIndex::Stream(ty),
4077                options,
4078                Some(FlatAbi {
4079                    size: payload_size,
4080                    align: payload_align,
4081                }),
4082                stream,
4083                address,
4084                count,
4085            )
4086            .map(|result| result.encode())
4087    }
4088
4089    fn stream_drop_writable(
4090        &mut self,
4091        instance: Instance,
4092        ty: TypeStreamTableIndex,
4093        writer: u32,
4094    ) -> Result<()> {
4095        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4096    }
4097
4098    fn error_context_debug_message(
4099        &mut self,
4100        instance: Instance,
4101        ty: TypeComponentLocalErrorContextTableIndex,
4102        options: OptionsIndex,
4103        err_ctx_handle: u32,
4104        debug_msg_address: u32,
4105    ) -> Result<()> {
4106        instance.error_context_debug_message(
4107            StoreContextMut(self),
4108            ty,
4109            options,
4110            err_ctx_handle,
4111            debug_msg_address,
4112        )
4113    }
4114
4115    fn thread_new_indirect(
4116        &mut self,
4117        instance: Instance,
4118        caller: RuntimeComponentInstanceIndex,
4119        func_ty_idx: TypeFuncIndex,
4120        start_func_table_idx: RuntimeTableIndex,
4121        start_func_idx: u32,
4122        context: i32,
4123    ) -> Result<u32> {
4124        instance.thread_new_indirect(
4125            StoreContextMut(self),
4126            caller,
4127            func_ty_idx,
4128            start_func_table_idx,
4129            start_func_idx,
4130            context,
4131        )
4132    }
4133}
4134
4135type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4136
4137/// Represents the state of a pending host task.
4138///
4139/// This is used to represent tasks when the guest calls into the host.
4140struct HostTask {
4141    common: WaitableCommon,
4142
4143    /// Guest thread which called the host.
4144    caller: QualifiedThreadId,
4145
4146    /// State of borrows/etc the host needs to track. Used when the guest passes
4147    /// borrows to the host, for example.
4148    call_context: CallContext,
4149
4150    /// For host tasks which end up doing some asynchronous work (e.g.
4151    /// async-lowered and didn't complete on the first poll) this handle is used
4152    /// as a signal to cancel the future as it resides in the store's
4153    /// `FuturesUnordered`.
4154    join_handle: Option<JoinHandle>,
4155
4156    /// Box<Any> of the result of this host task.
4157    result: Option<LiftedResult>,
4158}
4159
4160impl HostTask {
4161    fn new(caller: QualifiedThreadId) -> Self {
4162        Self {
4163            common: WaitableCommon::default(),
4164            call_context: CallContext::default(),
4165            caller,
4166            join_handle: None,
4167            result: None,
4168        }
4169    }
4170}
4171
4172impl TableDebug for HostTask {
4173    fn type_name() -> &'static str {
4174        "HostTask"
4175    }
4176}
4177
4178type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4179
4180/// Represents the caller of a given guest task.
4181enum Caller {
4182    /// The host called the guest task.
4183    Host {
4184        /// If present, may be used to deliver the result.
4185        tx: Option<oneshot::Sender<LiftedResult>>,
4186        /// Channel to notify once all subtasks spawned by this caller have
4187        /// completed.
4188        ///
4189        /// Note that we'll never actually send anything to this channel;
4190        /// dropping it when the refcount goes to zero is sufficient to notify
4191        /// the receiver.
4192        exit_tx: Arc<oneshot::Sender<()>>,
4193        /// If true, there's a host future that must be dropped before the task
4194        /// can be deleted.
4195        host_future_present: bool,
4196        /// Represents the caller of the host function which called back into a
4197        /// guest. Note that this thread could belong to an entirely unrelated
4198        /// top-level component instance than the one the host called into.
4199        caller: CurrentThread,
4200    },
4201    /// Another guest thread called the guest task
4202    Guest {
4203        /// The id of the caller
4204        thread: QualifiedThreadId,
4205    },
4206}
4207
4208/// Represents a closure and related canonical ABI parameters required to
4209/// validate a `task.return` call at runtime and lift the result.
4210struct LiftResult {
4211    lift: RawLift,
4212    ty: TypeTupleIndex,
4213    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4214    string_encoding: StringEncoding,
4215}
4216
4217/// The table ID for a guest thread, qualified by the task to which it belongs.
4218///
4219/// This exists to minimize table lookups and the necessity to pass stores around mutably
4220/// for the common case of identifying the task to which a thread belongs.
4221#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4222struct QualifiedThreadId {
4223    task: TableId<GuestTask>,
4224    thread: TableId<GuestThread>,
4225}
4226
4227impl QualifiedThreadId {
4228    fn qualify(
4229        state: &mut ConcurrentState,
4230        thread: TableId<GuestThread>,
4231    ) -> Result<QualifiedThreadId> {
4232        Ok(QualifiedThreadId {
4233            task: state.get_mut(thread)?.parent_task,
4234            thread,
4235        })
4236    }
4237}
4238
4239impl fmt::Debug for QualifiedThreadId {
4240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4241        f.debug_tuple("QualifiedThreadId")
4242            .field(&self.task.rep())
4243            .field(&self.thread.rep())
4244            .finish()
4245    }
4246}
4247
4248enum GuestThreadState {
4249    NotStartedImplicit,
4250    NotStartedExplicit(
4251        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4252    ),
4253    Running,
4254    Suspended(StoreFiber<'static>),
4255    Ready(StoreFiber<'static>),
4256    Completed,
4257}
4258pub struct GuestThread {
4259    /// Context-local state used to implement the `context.{get,set}`
4260    /// intrinsics.
4261    context: [u32; 2],
4262    /// The owning guest task.
4263    parent_task: TableId<GuestTask>,
4264    /// If present, indicates that the thread is currently waiting on the
4265    /// specified set but may be cancelled and woken immediately.
4266    wake_on_cancel: Option<TableId<WaitableSet>>,
4267    /// The execution state of this guest thread
4268    state: GuestThreadState,
4269    /// The index of this thread in the component instance's handle table.
4270    /// This must always be `Some` after initialization.
4271    instance_rep: Option<u32>,
4272}
4273
4274impl GuestThread {
4275    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4276    /// handle.
4277    fn from_instance(
4278        state: Pin<&mut ComponentInstance>,
4279        caller_instance: RuntimeComponentInstanceIndex,
4280        guest_thread: u32,
4281    ) -> Result<TableId<Self>> {
4282        let rep = state.instance_states().0[caller_instance]
4283            .thread_handle_table()
4284            .guest_thread_rep(guest_thread)?;
4285        Ok(TableId::new(rep))
4286    }
4287
4288    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4289        Self {
4290            context: [0; 2],
4291            parent_task,
4292            wake_on_cancel: None,
4293            state: GuestThreadState::NotStartedImplicit,
4294            instance_rep: None,
4295        }
4296    }
4297
4298    fn new_explicit(
4299        parent_task: TableId<GuestTask>,
4300        start_func: Box<
4301            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4302        >,
4303    ) -> Self {
4304        Self {
4305            context: [0; 2],
4306            parent_task,
4307            wake_on_cancel: None,
4308            state: GuestThreadState::NotStartedExplicit(start_func),
4309            instance_rep: None,
4310        }
4311    }
4312}
4313
4314impl TableDebug for GuestThread {
4315    fn type_name() -> &'static str {
4316        "GuestThread"
4317    }
4318}
4319
4320enum SyncResult {
4321    NotProduced,
4322    Produced(Option<ValRaw>),
4323    Taken,
4324}
4325
4326impl SyncResult {
4327    fn take(&mut self) -> Option<Option<ValRaw>> {
4328        match mem::replace(self, SyncResult::Taken) {
4329            SyncResult::NotProduced => None,
4330            SyncResult::Produced(val) => Some(val),
4331            SyncResult::Taken => {
4332                panic!("attempted to take a synchronous result that was already taken")
4333            }
4334        }
4335    }
4336}
4337
4338#[derive(Debug)]
4339enum HostFutureState {
4340    NotApplicable,
4341    Live,
4342    Dropped,
4343}
4344
4345/// Represents a pending guest task.
4346pub(crate) struct GuestTask {
4347    /// See `WaitableCommon`
4348    common: WaitableCommon,
4349    /// Closure to lower the parameters passed to this task.
4350    lower_params: Option<RawLower>,
4351    /// See `LiftResult`
4352    lift_result: Option<LiftResult>,
4353    /// A place to stash the type-erased lifted result if it can't be delivered
4354    /// immediately.
4355    result: Option<LiftedResult>,
4356    /// Closure to call the callback function for an async-lifted export, if
4357    /// provided.
4358    callback: Option<CallbackFn>,
4359    /// See `Caller`
4360    caller: Caller,
4361    /// Borrow state for this task.
4362    ///
4363    /// Keeps track of `borrow<T>` received to this task to ensure that
4364    /// everything is dropped by the time it exits.
4365    call_context: CallContext,
4366    /// A place to stash the lowered result for a sync-to-async call until it
4367    /// can be returned to the caller.
4368    sync_result: SyncResult,
4369    /// Whether or not the task has been cancelled (i.e. whether the task is
4370    /// permitted to call `task.cancel`).
4371    cancel_sent: bool,
4372    /// Whether or not we've sent a `Status::Starting` event to any current or
4373    /// future waiters for this waitable.
4374    starting_sent: bool,
4375    /// Pending guest subtasks created by this task (directly or indirectly).
4376    ///
4377    /// This is used to re-parent subtasks which are still running when their
4378    /// parent task is disposed.
4379    subtasks: HashSet<TableId<GuestTask>>,
4380    /// Scratch waitable set used to watch subtasks during synchronous calls.
4381    sync_call_set: TableId<WaitableSet>,
4382    /// The runtime instance to which the exported function for this guest task
4383    /// belongs.
4384    ///
4385    /// Note that the task may do a sync->sync call via a fused adapter which
4386    /// results in that task executing code in a different instance, and it may
4387    /// call host functions and intrinsics from that other instance.
4388    instance: RuntimeInstance,
4389    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4390    /// delivered to this task.
4391    event: Option<Event>,
4392    /// Whether or not the task has exited.
4393    exited: bool,
4394    /// Threads belonging to this task
4395    threads: HashSet<TableId<GuestThread>>,
4396    /// The state of the host future that represents an async task, which must
4397    /// be dropped before we can delete the task.
4398    host_future_state: HostFutureState,
4399    /// Indicates whether this task was created for a call to an async-lifted
4400    /// export.
4401    async_function: bool,
4402}
4403
4404impl GuestTask {
4405    fn already_lowered_parameters(&self) -> bool {
4406        // We reset `lower_params` after we lower the parameters
4407        self.lower_params.is_none()
4408    }
4409
4410    fn returned_or_cancelled(&self) -> bool {
4411        // We reset `lift_result` after we return or exit
4412        self.lift_result.is_none()
4413    }
4414
4415    fn ready_to_delete(&self) -> bool {
4416        let threads_completed = self.threads.is_empty();
4417        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4418        let pending_completion_event = matches!(
4419            self.common.event,
4420            Some(Event::Subtask {
4421                status: Status::Returned | Status::ReturnCancelled
4422            })
4423        );
4424        let ready = threads_completed
4425            && !has_sync_result
4426            && !pending_completion_event
4427            && !matches!(self.host_future_state, HostFutureState::Live);
4428        log::trace!(
4429            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4430            threads_completed,
4431            has_sync_result,
4432            pending_completion_event,
4433            self.host_future_state
4434        );
4435        ready
4436    }
4437
4438    fn new(
4439        state: &mut ConcurrentState,
4440        lower_params: RawLower,
4441        lift_result: LiftResult,
4442        caller: Caller,
4443        callback: Option<CallbackFn>,
4444        instance: RuntimeInstance,
4445        async_function: bool,
4446    ) -> Result<Self> {
4447        let sync_call_set = state.push(WaitableSet::default())?;
4448        let host_future_state = match &caller {
4449            Caller::Guest { .. } => HostFutureState::NotApplicable,
4450            Caller::Host {
4451                host_future_present,
4452                ..
4453            } => {
4454                if *host_future_present {
4455                    HostFutureState::Live
4456                } else {
4457                    HostFutureState::NotApplicable
4458                }
4459            }
4460        };
4461        Ok(Self {
4462            common: WaitableCommon::default(),
4463            lower_params: Some(lower_params),
4464            lift_result: Some(lift_result),
4465            result: None,
4466            callback,
4467            caller,
4468            call_context: CallContext::default(),
4469            sync_result: SyncResult::NotProduced,
4470            cancel_sent: false,
4471            starting_sent: false,
4472            subtasks: HashSet::new(),
4473            sync_call_set,
4474            instance,
4475            event: None,
4476            exited: false,
4477            threads: HashSet::new(),
4478            host_future_state,
4479            async_function,
4480        })
4481    }
4482
4483    /// Dispose of this guest task, reparenting any pending subtasks to the
4484    /// caller.
4485    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4486        // If there are not-yet-delivered completion events for subtasks in
4487        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4488        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4489            if let Some(Event::Subtask {
4490                status: Status::Returned | Status::ReturnCancelled,
4491            }) = waitable.common(state)?.event
4492            {
4493                waitable.delete_from(state)?;
4494            }
4495        }
4496
4497        assert!(self.threads.is_empty());
4498
4499        state.delete(self.sync_call_set)?;
4500
4501        // Reparent any pending subtasks to the caller.
4502        match &self.caller {
4503            Caller::Guest { thread } => {
4504                let task_mut = state.get_mut(thread.task)?;
4505                let present = task_mut.subtasks.remove(&me);
4506                assert!(present);
4507
4508                for subtask in &self.subtasks {
4509                    task_mut.subtasks.insert(*subtask);
4510                }
4511
4512                for subtask in &self.subtasks {
4513                    state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4514                }
4515            }
4516            Caller::Host {
4517                exit_tx, caller, ..
4518            } => {
4519                for subtask in &self.subtasks {
4520                    state.get_mut(*subtask)?.caller = Caller::Host {
4521                        tx: None,
4522                        // Clone `exit_tx` to ensure that it is only dropped
4523                        // once all transitive subtasks of the host call have
4524                        // exited:
4525                        exit_tx: exit_tx.clone(),
4526                        host_future_present: false,
4527                        caller: *caller,
4528                    };
4529                }
4530            }
4531        }
4532
4533        for subtask in self.subtasks {
4534            let task = state.get_mut(subtask)?;
4535            if task.exited && task.ready_to_delete() {
4536                Waitable::Guest(subtask).delete_from(state)?;
4537            }
4538        }
4539
4540        Ok(())
4541    }
4542}
4543
4544impl TableDebug for GuestTask {
4545    fn type_name() -> &'static str {
4546        "GuestTask"
4547    }
4548}
4549
4550/// Represents state common to all kinds of waitables.
4551#[derive(Default)]
4552struct WaitableCommon {
4553    /// The currently pending event for this waitable, if any.
4554    event: Option<Event>,
4555    /// The set to which this waitable belongs, if any.
4556    set: Option<TableId<WaitableSet>>,
4557    /// The handle with which the guest refers to this waitable, if any.
4558    handle: Option<u32>,
4559}
4560
4561/// Represents a Component Model Async `waitable`.
4562#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4563enum Waitable {
4564    /// A host task
4565    Host(TableId<HostTask>),
4566    /// A guest task
4567    Guest(TableId<GuestTask>),
4568    /// The read or write end of a stream or future
4569    Transmit(TableId<TransmitHandle>),
4570}
4571
4572impl Waitable {
4573    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4574    /// handle.
4575    fn from_instance(
4576        state: Pin<&mut ComponentInstance>,
4577        caller_instance: RuntimeComponentInstanceIndex,
4578        waitable: u32,
4579    ) -> Result<Self> {
4580        use crate::runtime::vm::component::Waitable;
4581
4582        let (waitable, kind) = state.instance_states().0[caller_instance]
4583            .handle_table()
4584            .waitable_rep(waitable)?;
4585
4586        Ok(match kind {
4587            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4588            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4589            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4590        })
4591    }
4592
4593    /// Retrieve the host-visible identifier for this `Waitable`.
4594    fn rep(&self) -> u32 {
4595        match self {
4596            Self::Host(id) => id.rep(),
4597            Self::Guest(id) => id.rep(),
4598            Self::Transmit(id) => id.rep(),
4599        }
4600    }
4601
4602    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4603    /// remove it from any set it may currently belong to (when `set` is
4604    /// `None`).
4605    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4606        log::trace!("waitable {self:?} join set {set:?}",);
4607
4608        let old = mem::replace(&mut self.common(state)?.set, set);
4609
4610        if let Some(old) = old {
4611            match *self {
4612                Waitable::Host(id) => state.remove_child(id, old),
4613                Waitable::Guest(id) => state.remove_child(id, old),
4614                Waitable::Transmit(id) => state.remove_child(id, old),
4615            }?;
4616
4617            state.get_mut(old)?.ready.remove(self);
4618        }
4619
4620        if let Some(set) = set {
4621            match *self {
4622                Waitable::Host(id) => state.add_child(id, set),
4623                Waitable::Guest(id) => state.add_child(id, set),
4624                Waitable::Transmit(id) => state.add_child(id, set),
4625            }?;
4626
4627            if self.common(state)?.event.is_some() {
4628                self.mark_ready(state)?;
4629            }
4630        }
4631
4632        Ok(())
4633    }
4634
4635    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4636    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4637        Ok(match self {
4638            Self::Host(id) => &mut state.get_mut(*id)?.common,
4639            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4640            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4641        })
4642    }
4643
4644    /// Set or clear the pending event for this waitable and either deliver it
4645    /// to the first waiter, if any, or mark it as ready to be delivered to the
4646    /// next waiter that arrives.
4647    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4648        log::trace!("set event for {self:?}: {event:?}");
4649        self.common(state)?.event = event;
4650        self.mark_ready(state)
4651    }
4652
4653    /// Take the pending event from this waitable, leaving `None` in its place.
4654    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4655        let common = self.common(state)?;
4656        let event = common.event.take();
4657        if let Some(set) = self.common(state)?.set {
4658            state.get_mut(set)?.ready.remove(self);
4659        }
4660
4661        Ok(event)
4662    }
4663
4664    /// Deliver the current event for this waitable to the first waiter, if any,
4665    /// or else mark it as ready to be delivered to the next waiter that
4666    /// arrives.
4667    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4668        if let Some(set) = self.common(state)?.set {
4669            state.get_mut(set)?.ready.insert(*self);
4670            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4671                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4672                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4673
4674                let item = match mode {
4675                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4676                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4677                        state.get_mut(thread.task)?.instance.index,
4678                        GuestCall {
4679                            thread,
4680                            kind: GuestCallKind::DeliverEvent {
4681                                instance,
4682                                set: Some(set),
4683                            },
4684                        },
4685                    ),
4686                };
4687                state.push_high_priority(item);
4688            }
4689        }
4690        Ok(())
4691    }
4692
4693    /// Remove this waitable from the instance's rep table.
4694    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4695        match self {
4696            Self::Host(task) => {
4697                log::trace!("delete host task {task:?}");
4698                state.delete(*task)?;
4699            }
4700            Self::Guest(task) => {
4701                log::trace!("delete guest task {task:?}");
4702                state.delete(*task)?.dispose(state, *task)?;
4703            }
4704            Self::Transmit(task) => {
4705                state.delete(*task)?;
4706            }
4707        }
4708
4709        Ok(())
4710    }
4711}
4712
4713impl fmt::Debug for Waitable {
4714    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4715        match self {
4716            Self::Host(id) => write!(f, "{id:?}"),
4717            Self::Guest(id) => write!(f, "{id:?}"),
4718            Self::Transmit(id) => write!(f, "{id:?}"),
4719        }
4720    }
4721}
4722
4723/// Represents a Component Model Async `waitable-set`.
4724#[derive(Default)]
4725struct WaitableSet {
4726    /// Which waitables in this set have pending events, if any.
4727    ready: BTreeSet<Waitable>,
4728    /// Which guest threads are currently waiting on this set, if any.
4729    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4730}
4731
4732impl TableDebug for WaitableSet {
4733    fn type_name() -> &'static str {
4734        "WaitableSet"
4735    }
4736}
4737
4738/// Type-erased closure to lower the parameters for a guest task.
4739type RawLower =
4740    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4741
4742/// Type-erased closure to lift the result for a guest task.
4743type RawLift = Box<
4744    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4745>;
4746
4747/// Type erased result of a guest task which may be downcast to the expected
4748/// type by a host caller (or simply ignored in the case of a guest caller; see
4749/// `DummyResult`).
4750type LiftedResult = Box<dyn Any + Send + Sync>;
4751
4752/// Used to return a result from a `LiftFn` when the actual result has already
4753/// been lowered to a guest task's stack and linear memory.
4754struct DummyResult;
4755
4756/// Represents the Component Model Async state of a (sub-)component instance.
4757#[derive(Default)]
4758pub struct ConcurrentInstanceState {
4759    /// Whether backpressure is set for this instance (enabled if >0)
4760    backpressure: u16,
4761    /// Whether this instance can be entered
4762    do_not_enter: bool,
4763    /// Pending calls for this instance which require `Self::backpressure` to be
4764    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4765    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4766}
4767
4768impl ConcurrentInstanceState {
4769    pub fn pending_is_empty(&self) -> bool {
4770        self.pending.is_empty()
4771    }
4772}
4773
4774#[derive(Debug, Copy, Clone)]
4775enum CurrentThread {
4776    Guest(QualifiedThreadId),
4777    Host(TableId<HostTask>),
4778    None,
4779}
4780
4781impl CurrentThread {
4782    fn guest(&self) -> Option<&QualifiedThreadId> {
4783        match self {
4784            Self::Guest(id) => Some(id),
4785            _ => None,
4786        }
4787    }
4788
4789    fn host(&self) -> Option<TableId<HostTask>> {
4790        match self {
4791            Self::Host(id) => Some(*id),
4792            _ => None,
4793        }
4794    }
4795
4796    fn is_none(&self) -> bool {
4797        matches!(self, Self::None)
4798    }
4799}
4800
4801impl From<QualifiedThreadId> for CurrentThread {
4802    fn from(id: QualifiedThreadId) -> Self {
4803        Self::Guest(id)
4804    }
4805}
4806
4807impl From<TableId<HostTask>> for CurrentThread {
4808    fn from(id: TableId<HostTask>) -> Self {
4809        Self::Host(id)
4810    }
4811}
4812
4813/// Represents the Component Model Async state of a store.
4814pub struct ConcurrentState {
4815    /// The currently running thread, if any.
4816    current_thread: CurrentThread,
4817
4818    /// The set of pending host and background tasks, if any.
4819    ///
4820    /// See `ComponentInstance::poll_until` for where we temporarily take this
4821    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4822    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4823    /// The table of waitables, waitable sets, etc.
4824    table: AlwaysMut<ResourceTable>,
4825    /// The "high priority" work queue for this store's event loop.
4826    high_priority: Vec<WorkItem>,
4827    /// The "low priority" work queue for this store's event loop.
4828    low_priority: VecDeque<WorkItem>,
4829    /// A place to stash the reason a fiber is suspending so that the code which
4830    /// resumed it will know under what conditions the fiber should be resumed
4831    /// again.
4832    suspend_reason: Option<SuspendReason>,
4833    /// A cached fiber which is waiting for work to do.
4834    ///
4835    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4836    worker: Option<StoreFiber<'static>>,
4837    /// A place to stash the work item for which we're resuming a worker fiber.
4838    worker_item: Option<WorkerItem>,
4839
4840    /// Reference counts for all component error contexts
4841    ///
4842    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4843    /// (sub)component ref counts as tracked by `error_context_tables`, for
4844    /// example when the host holds one or more references to error contexts.
4845    ///
4846    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4847    /// component-wide representation) of the index into concurrent state for a given
4848    /// stored `ErrorContext`.
4849    ///
4850    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4851    /// as a `TableId<ErrorContextState>`.
4852    global_error_context_ref_counts:
4853        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4854}
4855
4856impl Default for ConcurrentState {
4857    fn default() -> Self {
4858        Self {
4859            current_thread: CurrentThread::None,
4860            table: AlwaysMut::new(ResourceTable::new()),
4861            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4862            high_priority: Vec::new(),
4863            low_priority: VecDeque::new(),
4864            suspend_reason: None,
4865            worker: None,
4866            worker_item: None,
4867            global_error_context_ref_counts: BTreeMap::new(),
4868        }
4869    }
4870}
4871
4872impl ConcurrentState {
4873    /// Take ownership of any fibers and futures owned by this object.
4874    ///
4875    /// This should be used when disposing of the `Store` containing this object
4876    /// in order to gracefully resolve any and all fibers using
4877    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4878    /// use-after-free bugs due to fibers which may still have access to the
4879    /// `Store`.
4880    ///
4881    /// Additionally, the futures collected with this function should be dropped
4882    /// within a `tls::set` call, which will ensure than any futures closing
4883    /// over an `&Accessor` will have access to the store when dropped, allowing
4884    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4885    /// panicking.
4886    ///
4887    /// Note that this will leave the object in an inconsistent and unusable
4888    /// state, so it should only be used just prior to dropping it.
4889    pub(crate) fn take_fibers_and_futures(
4890        &mut self,
4891        fibers: &mut Vec<StoreFiber<'static>>,
4892        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4893    ) {
4894        for entry in self.table.get_mut().iter_mut() {
4895            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4896                for mode in mem::take(&mut set.waiting).into_values() {
4897                    if let WaitMode::Fiber(fiber) = mode {
4898                        fibers.push(fiber);
4899                    }
4900                }
4901            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4902                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4903                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4904                {
4905                    fibers.push(fiber);
4906                }
4907            }
4908        }
4909
4910        if let Some(fiber) = self.worker.take() {
4911            fibers.push(fiber);
4912        }
4913
4914        let mut handle_item = |item| match item {
4915            WorkItem::ResumeFiber(fiber) => {
4916                fibers.push(fiber);
4917            }
4918            WorkItem::PushFuture(future) => {
4919                self.futures
4920                    .get_mut()
4921                    .as_mut()
4922                    .unwrap()
4923                    .push(future.into_inner());
4924            }
4925            _ => {}
4926        };
4927
4928        for item in mem::take(&mut self.high_priority) {
4929            handle_item(item);
4930        }
4931        for item in mem::take(&mut self.low_priority) {
4932            handle_item(item);
4933        }
4934
4935        if let Some(them) = self.futures.get_mut().take() {
4936            futures.push(them);
4937        }
4938    }
4939
4940    /// Collect the next set of work items to run. This will be either all
4941    /// high-priority items, or a single low-priority item if there are no
4942    /// high-priority items.
4943    fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4944        let mut ready = mem::take(&mut self.high_priority);
4945        if ready.is_empty() {
4946            if let Some(item) = self.low_priority.pop_back() {
4947                ready.push(item);
4948            }
4949        }
4950        ready
4951    }
4952
4953    fn push<V: Send + Sync + 'static>(
4954        &mut self,
4955        value: V,
4956    ) -> Result<TableId<V>, ResourceTableError> {
4957        self.table.get_mut().push(value).map(TableId::from)
4958    }
4959
4960    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4961        self.table.get_mut().get_mut(&Resource::from(id))
4962    }
4963
4964    pub fn add_child<T: 'static, U: 'static>(
4965        &mut self,
4966        child: TableId<T>,
4967        parent: TableId<U>,
4968    ) -> Result<(), ResourceTableError> {
4969        self.table
4970            .get_mut()
4971            .add_child(Resource::from(child), Resource::from(parent))
4972    }
4973
4974    pub fn remove_child<T: 'static, U: 'static>(
4975        &mut self,
4976        child: TableId<T>,
4977        parent: TableId<U>,
4978    ) -> Result<(), ResourceTableError> {
4979        self.table
4980            .get_mut()
4981            .remove_child(Resource::from(child), Resource::from(parent))
4982    }
4983
4984    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4985        self.table.get_mut().delete(Resource::from(id))
4986    }
4987
4988    fn push_future(&mut self, future: HostTaskFuture) {
4989        // Note that we can't directly push to `ConcurrentState::futures` here
4990        // since this may be called from a future that's being polled inside
4991        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4992        // so it has exclusive access while polling it.  Therefore, we push a
4993        // work item to the "high priority" queue, which will actually push to
4994        // `ConcurrentState::futures` later.
4995        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4996    }
4997
4998    fn push_high_priority(&mut self, item: WorkItem) {
4999        log::trace!("push high priority: {item:?}");
5000        self.high_priority.push(item);
5001    }
5002
5003    fn push_low_priority(&mut self, item: WorkItem) {
5004        log::trace!("push low priority: {item:?}");
5005        self.low_priority.push_front(item);
5006    }
5007
5008    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5009        if high_priority {
5010            self.push_high_priority(item);
5011        } else {
5012            self.push_low_priority(item);
5013        }
5014    }
5015
5016    fn promote_instance_local_thread_work_item(
5017        &mut self,
5018        current_instance: RuntimeComponentInstanceIndex,
5019    ) -> bool {
5020        self.promote_work_items_matching(|item: &WorkItem| match item {
5021            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5022                *instance == current_instance
5023            }
5024            _ => false,
5025        })
5026    }
5027
5028    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5029        self.promote_work_items_matching(|item: &WorkItem| match item {
5030            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5031                *t == thread
5032            }
5033            _ => false,
5034        })
5035    }
5036
5037    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5038    where
5039        F: FnMut(&WorkItem) -> bool,
5040    {
5041        // If there's a high-priority work item to resume the current guest thread,
5042        // we don't need to promote anything, but we return true to indicate that
5043        // work is pending for the current instance.
5044        if self.high_priority.iter().any(&mut predicate) {
5045            true
5046        }
5047        // Otherwise, look for a low-priority work item that matches the current
5048        // instance and promote it to high-priority.
5049        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5050            let item = self.low_priority.remove(idx).unwrap();
5051            self.push_high_priority(item);
5052            true
5053        } else {
5054            false
5055        }
5056    }
5057
5058    /// Implements the `context.get` intrinsic.
5059    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5060        let thread = self.unwrap_current_guest_thread();
5061        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5062        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5063        Ok(val)
5064    }
5065
5066    /// Implements the `context.set` intrinsic.
5067    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5068        let thread = self.unwrap_current_guest_thread();
5069        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5070        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5071        Ok(())
5072    }
5073
5074    /// Returns whether there's a pending cancellation on the current guest thread,
5075    /// consuming the event if so.
5076    fn take_pending_cancellation(&mut self) -> bool {
5077        let thread = self.unwrap_current_guest_thread();
5078        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5079            assert!(matches!(event, Event::Cancelled));
5080            true
5081        } else {
5082            false
5083        }
5084    }
5085
5086    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5087        if self.may_block(task) {
5088            Ok(())
5089        } else {
5090            Err(Trap::CannotBlockSyncTask.into())
5091        }
5092    }
5093
5094    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5095        let task = self.get_mut(task).unwrap();
5096        task.async_function || task.returned_or_cancelled()
5097    }
5098
5099    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5100    /// specified task.
5101    ///
5102    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5103    /// below.
5104    pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5105        let (task, is_host) = (task >> 1, task & 1 == 1);
5106        if is_host {
5107            let task: TableId<HostTask> = TableId::new(task);
5108            &mut self.get_mut(task).unwrap().call_context
5109        } else {
5110            let task: TableId<GuestTask> = TableId::new(task);
5111            &mut self.get_mut(task).unwrap().call_context
5112        }
5113    }
5114
5115    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5116    /// in the future.
5117    pub fn current_call_context_scope_id(&self) -> u32 {
5118        let (bits, is_host) = match self.current_thread {
5119            CurrentThread::Guest(id) => (id.task.rep(), false),
5120            CurrentThread::Host(id) => (id.rep(), true),
5121            CurrentThread::None => unreachable!(),
5122        };
5123        assert_eq!((bits << 1) >> 1, bits);
5124        (bits << 1) | u32::from(is_host)
5125    }
5126
5127    fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5128        *self.current_thread.guest().unwrap()
5129    }
5130
5131    fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5132        self.current_thread.host().unwrap()
5133    }
5134}
5135
5136/// Provide a type hint to compiler about the shape of a parameter lower
5137/// closure.
5138fn for_any_lower<
5139    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5140>(
5141    fun: F,
5142) -> F {
5143    fun
5144}
5145
5146/// Provide a type hint to compiler about the shape of a result lift closure.
5147fn for_any_lift<
5148    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5149>(
5150    fun: F,
5151) -> F {
5152    fun
5153}
5154
5155/// Wrap the specified future in a `poll_fn` which asserts that the future is
5156/// only polled from the event loop of the specified `Store`.
5157///
5158/// See `StoreContextMut::run_concurrent` for details.
5159fn checked<F: Future + Send + 'static>(
5160    id: StoreId,
5161    fut: F,
5162) -> impl Future<Output = F::Output> + Send + 'static {
5163    async move {
5164        let mut fut = pin!(fut);
5165        future::poll_fn(move |cx| {
5166            let message = "\
5167                `Future`s which depend on asynchronous component tasks, streams, or \
5168                futures to complete may only be polled from the event loop of the \
5169                store to which they belong.  Please use \
5170                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5171            ";
5172            tls::try_get(|store| {
5173                let matched = match store {
5174                    tls::TryGet::Some(store) => store.id() == id,
5175                    tls::TryGet::Taken | tls::TryGet::None => false,
5176                };
5177
5178                if !matched {
5179                    panic!("{message}")
5180                }
5181            });
5182            fut.as_mut().poll(cx)
5183        })
5184        .await
5185    }
5186}
5187
5188/// Assert that `StoreContextMut::run_concurrent` has not been called from
5189/// within an store's event loop.
5190fn check_recursive_run() {
5191    tls::try_get(|store| {
5192        if !matches!(store, tls::TryGet::None) {
5193            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5194        }
5195    });
5196}
5197
5198fn unpack_callback_code(code: u32) -> (u32, u32) {
5199    (code & 0xF, code >> 4)
5200}
5201
5202/// Helper struct for packaging parameters to be passed to
5203/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5204/// `waitable-set.poll`.
5205struct WaitableCheckParams {
5206    set: TableId<WaitableSet>,
5207    options: OptionsIndex,
5208    payload: u32,
5209}
5210
5211/// Indicates whether `ComponentInstance::waitable_check` is being called for
5212/// `waitable-set.wait` or `waitable-set.poll`.
5213enum WaitableCheck {
5214    Wait,
5215    Poll,
5216}
5217
5218/// Represents a guest task called from the host, prepared using `prepare_call`.
5219pub(crate) struct PreparedCall<R> {
5220    /// The guest export to be called
5221    handle: Func,
5222    /// The guest thread created by `prepare_call`
5223    thread: QualifiedThreadId,
5224    /// The number of lowered core Wasm parameters to pass to the call.
5225    param_count: usize,
5226    /// The `oneshot::Receiver` to which the result of the call will be
5227    /// delivered when it is available.
5228    rx: oneshot::Receiver<LiftedResult>,
5229    /// The `oneshot::Receiver` which will resolve when the task -- and any
5230    /// transitive subtasks -- have all exited.
5231    exit_rx: oneshot::Receiver<()>,
5232    _phantom: PhantomData<R>,
5233}
5234
5235impl<R> PreparedCall<R> {
5236    /// Get a copy of the `TaskId` for this `PreparedCall`.
5237    pub(crate) fn task_id(&self) -> TaskId {
5238        TaskId {
5239            task: self.thread.task,
5240        }
5241    }
5242}
5243
5244/// Represents a task created by `prepare_call`.
5245pub(crate) struct TaskId {
5246    task: TableId<GuestTask>,
5247}
5248
5249impl TaskId {
5250    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5251    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5252    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5253    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5254    /// and delete the task when all threads are done.
5255    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5256        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5257        if !task.already_lowered_parameters() {
5258            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5259        } else {
5260            task.host_future_state = HostFutureState::Dropped;
5261            if task.ready_to_delete() {
5262                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5263            }
5264        }
5265        Ok(())
5266    }
5267}
5268
5269/// Prepare a call to the specified exported Wasm function, providing functions
5270/// for lowering the parameters and lifting the result.
5271///
5272/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5273/// loop, use `queue_call`.
5274pub(crate) fn prepare_call<T, R>(
5275    mut store: StoreContextMut<T>,
5276    handle: Func,
5277    param_count: usize,
5278    host_future_present: bool,
5279    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5280    + Send
5281    + Sync
5282    + 'static,
5283    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5284    + Send
5285    + Sync
5286    + 'static,
5287) -> Result<PreparedCall<R>> {
5288    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5289
5290    let instance = handle.instance().id().get(store.0);
5291    let options = &instance.component().env_component().options[options];
5292    let ty = &instance.component().types()[ty];
5293    let async_function = ty.async_;
5294    let task_return_type = ty.results;
5295    let component_instance = raw_options.instance;
5296    let callback = options.callback.map(|i| instance.runtime_callback(i));
5297    let memory = options
5298        .memory()
5299        .map(|i| instance.runtime_memory(i))
5300        .map(SendSyncPtr::new);
5301    let string_encoding = options.string_encoding;
5302    let token = StoreToken::new(store.as_context_mut());
5303    let state = store.0.concurrent_state_mut();
5304
5305    let (tx, rx) = oneshot::channel();
5306    let (exit_tx, exit_rx) = oneshot::channel();
5307
5308    let instance = RuntimeInstance {
5309        instance: handle.instance().id().instance(),
5310        index: component_instance,
5311    };
5312    let caller = state.current_thread;
5313    let task = GuestTask::new(
5314        state,
5315        Box::new(for_any_lower(move |store, params| {
5316            lower_params(handle, token.as_context_mut(store), params)
5317        })),
5318        LiftResult {
5319            lift: Box::new(for_any_lift(move |store, result| {
5320                lift_result(handle, store, result)
5321            })),
5322            ty: task_return_type,
5323            memory,
5324            string_encoding,
5325        },
5326        Caller::Host {
5327            tx: Some(tx),
5328            exit_tx: Arc::new(exit_tx),
5329            host_future_present,
5330            caller,
5331        },
5332        callback.map(|callback| {
5333            let callback = SendSyncPtr::new(callback);
5334            let instance = handle.instance();
5335            Box::new(move |store: &mut dyn VMStore, event, handle| {
5336                let store = token.as_context_mut(store);
5337                // SAFETY: Per the contract of `prepare_call`, the callback
5338                // will remain valid at least as long is this task exists.
5339                unsafe { instance.call_callback(store, callback, event, handle) }
5340            }) as CallbackFn
5341        }),
5342        instance,
5343        async_function,
5344    )?;
5345
5346    let task = state.push(task)?;
5347    let thread = state.push(GuestThread::new_implicit(task))?;
5348    state.get_mut(task)?.threads.insert(thread);
5349
5350    if !store.0.may_enter(instance) {
5351        bail!(crate::Trap::CannotEnterComponent);
5352    }
5353
5354    Ok(PreparedCall {
5355        handle,
5356        thread: QualifiedThreadId { task, thread },
5357        param_count,
5358        rx,
5359        exit_rx,
5360        _phantom: PhantomData,
5361    })
5362}
5363
5364/// Queue a call previously prepared using `prepare_call` to be run as part of
5365/// the associated `ComponentInstance`'s event loop.
5366///
5367/// The returned future will resolve to the result once it is available, but
5368/// must only be polled via the instance's event loop. See
5369/// `StoreContextMut::run_concurrent` for details.
5370pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5371    mut store: StoreContextMut<T>,
5372    prepared: PreparedCall<R>,
5373) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5374    let PreparedCall {
5375        handle,
5376        thread,
5377        param_count,
5378        rx,
5379        exit_rx,
5380        ..
5381    } = prepared;
5382
5383    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5384
5385    Ok(checked(
5386        store.0.id(),
5387        rx.map(move |result| {
5388            result
5389                .map(|v| (*v.downcast().unwrap(), exit_rx))
5390                .map_err(crate::Error::from)
5391        }),
5392    ))
5393}
5394
5395/// Queue a call previously prepared using `prepare_call` to be run as part of
5396/// the associated `ComponentInstance`'s event loop.
5397fn queue_call0<T: 'static>(
5398    store: StoreContextMut<T>,
5399    handle: Func,
5400    guest_thread: QualifiedThreadId,
5401    param_count: usize,
5402) -> Result<()> {
5403    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5404    let is_concurrent = raw_options.async_;
5405    let callback = raw_options.callback;
5406    let instance = handle.instance();
5407    let callee = handle.lifted_core_func(store.0);
5408    let post_return = handle.post_return_core_func(store.0);
5409    let callback = callback.map(|i| {
5410        let instance = instance.id().get(store.0);
5411        SendSyncPtr::new(instance.runtime_callback(i))
5412    });
5413
5414    log::trace!("queueing call {guest_thread:?}");
5415
5416    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5417    // (with signatures appropriate for this call) and will remain valid as
5418    // long as this instance is valid.
5419    unsafe {
5420        instance.queue_call(
5421            store,
5422            guest_thread,
5423            SendSyncPtr::new(callee),
5424            param_count,
5425            1,
5426            is_concurrent,
5427            callback,
5428            post_return.map(SendSyncPtr::new),
5429        )
5430    }
5431}