wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::{
55    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, HandleTable, ResourceTables};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64    bail,
65    error::{Context as _, format_err},
66};
67use error_contexts::GlobalErrorContextRefCount;
68use futures::channel::oneshot;
69use futures::future::{self, FutureExt};
70use futures::stream::{FuturesUnordered, StreamExt};
71use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
72use std::any::Any;
73use std::borrow::ToOwned;
74use std::boxed::Box;
75use std::cell::UnsafeCell;
76use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
77use std::fmt;
78use std::future::Future;
79use std::marker::PhantomData;
80use std::mem::{self, ManuallyDrop, MaybeUninit};
81use std::ops::DerefMut;
82use std::pin::{Pin, pin};
83use std::ptr::{self, NonNull};
84use std::slice;
85use std::sync::Arc;
86use std::task::{Context, Poll, Waker};
87use std::vec::Vec;
88use table::{TableDebug, TableId};
89use wasmtime_environ::Trap;
90use wasmtime_environ::component::{
91    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
92    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
93    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
94    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
95    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
96};
97use wasmtime_environ::packed_option::ReservedValue;
98
99pub use abort::JoinHandle;
100pub use future_stream_any::{FutureAny, StreamAny};
101pub use futures_and_streams::{
102    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
103    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
104    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
105};
106pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
107
108mod abort;
109mod error_contexts;
110mod future_stream_any;
111mod futures_and_streams;
112pub(crate) mod table;
113pub(crate) mod tls;
114
115/// Constant defined in the Component Model spec to indicate that the async
116/// intrinsic (e.g. `future.write`) has not yet completed.
117const BLOCKED: u32 = 0xffff_ffff;
118
119/// Corresponds to `CallState` in the upstream spec.
120#[derive(Clone, Copy, Eq, PartialEq, Debug)]
121pub enum Status {
122    Starting = 0,
123    Started = 1,
124    Returned = 2,
125    StartCancelled = 3,
126    ReturnCancelled = 4,
127}
128
129impl Status {
130    /// Packs this status and the optional `waitable` provided into a 32-bit
131    /// result that the canonical ABI requires.
132    ///
133    /// The low 4 bits are reserved for the status while the upper 28 bits are
134    /// the waitable, if present.
135    pub fn pack(self, waitable: Option<u32>) -> u32 {
136        assert!(matches!(self, Status::Returned) == waitable.is_none());
137        let waitable = waitable.unwrap_or(0);
138        assert!(waitable < (1 << 28));
139        (waitable << 4) | (self as u32)
140    }
141}
142
143/// Corresponds to `EventCode` in the Component Model spec, plus related payload
144/// data.
145#[derive(Clone, Copy, Debug)]
146enum Event {
147    None,
148    Cancelled,
149    Subtask {
150        status: Status,
151    },
152    StreamRead {
153        code: ReturnCode,
154        pending: Option<(TypeStreamTableIndex, u32)>,
155    },
156    StreamWrite {
157        code: ReturnCode,
158        pending: Option<(TypeStreamTableIndex, u32)>,
159    },
160    FutureRead {
161        code: ReturnCode,
162        pending: Option<(TypeFutureTableIndex, u32)>,
163    },
164    FutureWrite {
165        code: ReturnCode,
166        pending: Option<(TypeFutureTableIndex, u32)>,
167    },
168}
169
170impl Event {
171    /// Lower this event to core Wasm integers for delivery to the guest.
172    ///
173    /// Note that the waitable handle, if any, is assumed to be lowered
174    /// separately.
175    fn parts(self) -> (u32, u32) {
176        const EVENT_NONE: u32 = 0;
177        const EVENT_SUBTASK: u32 = 1;
178        const EVENT_STREAM_READ: u32 = 2;
179        const EVENT_STREAM_WRITE: u32 = 3;
180        const EVENT_FUTURE_READ: u32 = 4;
181        const EVENT_FUTURE_WRITE: u32 = 5;
182        const EVENT_CANCELLED: u32 = 6;
183        match self {
184            Event::None => (EVENT_NONE, 0),
185            Event::Cancelled => (EVENT_CANCELLED, 0),
186            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
187            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
188            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
189            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
190            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
191        }
192    }
193}
194
195/// Corresponds to `CallbackCode` in the spec.
196mod callback_code {
197    pub const EXIT: u32 = 0;
198    pub const YIELD: u32 = 1;
199    pub const WAIT: u32 = 2;
200}
201
202/// A flag indicating that the callee is an async-lowered export.
203///
204/// This may be passed to the `async-start` intrinsic from a fused adapter.
205const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
206
207/// Provides access to either store data (via the `get` method) or the store
208/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
209/// instance to which the current host task belongs.
210///
211/// See [`Accessor::with`] for details.
212pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
213    store: StoreContextMut<'a, T>,
214    get_data: fn(&mut T) -> D::Data<'_>,
215}
216
217impl<'a, T, D> Access<'a, T, D>
218where
219    D: HasData + ?Sized,
220    T: 'static,
221{
222    /// Creates a new [`Access`] from its component parts.
223    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
224        Self { store, get_data }
225    }
226
227    /// Get mutable access to the store data.
228    pub fn data_mut(&mut self) -> &mut T {
229        self.store.data_mut()
230    }
231
232    /// Get mutable access to the store data.
233    pub fn get(&mut self) -> D::Data<'_> {
234        (self.get_data)(self.data_mut())
235    }
236
237    /// Spawn a background task.
238    ///
239    /// See [`Accessor::spawn`] for details.
240    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
241    where
242        T: 'static,
243    {
244        let accessor = Accessor {
245            get_data: self.get_data,
246            token: StoreToken::new(self.store.as_context_mut()),
247        };
248        self.store
249            .as_context_mut()
250            .spawn_with_accessor(accessor, task)
251    }
252
253    /// Returns the getter this accessor is using to project from `T` into
254    /// `D::Data`.
255    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
256        self.get_data
257    }
258}
259
260impl<'a, T, D> AsContext for Access<'a, T, D>
261where
262    D: HasData + ?Sized,
263    T: 'static,
264{
265    type Data = T;
266
267    fn as_context(&self) -> StoreContext<'_, T> {
268        self.store.as_context()
269    }
270}
271
272impl<'a, T, D> AsContextMut for Access<'a, T, D>
273where
274    D: HasData + ?Sized,
275    T: 'static,
276{
277    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
278        self.store.as_context_mut()
279    }
280}
281
282/// Provides scoped mutable access to store data in the context of a concurrent
283/// host task future.
284///
285/// This allows multiple host task futures to execute concurrently and access
286/// the store between (but not across) `await` points.
287///
288/// # Rationale
289///
290/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
291/// `D::Data<'_>`. The problem this is solving, however, is that it does not
292/// literally store these values. The basic problem is that when a concurrent
293/// host future is being polled it has access to `&mut T` (and the whole
294/// `Store`) but when it's not being polled it does not have access to these
295/// values. This reflects how the store is only ever polling one future at a
296/// time so the store is effectively being passed between futures.
297///
298/// Rust's `Future` trait, however, has no means of passing a `Store`
299/// temporarily between futures. The [`Context`](std::task::Context) type does
300/// not have the ability to attach arbitrary information to it at this time.
301/// This type, [`Accessor`], is used to bridge this expressivity gap.
302///
303/// The [`Accessor`] type here represents the ability to acquire, temporarily in
304/// a synchronous manner, the current store. The [`Accessor::with`] function
305/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
306/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
307/// not take an `async` closure as its argument, instead it's a synchronous
308/// closure which must complete during on run of `Future::poll`. This reflects
309/// how the store is temporarily made available while a host future is being
310/// polled.
311///
312/// # Implementation
313///
314/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
315/// this type additionally doesn't even have a lifetime parameter. This is
316/// instead a representation of proof of the ability to acquire these while a
317/// future is being polled. Wasmtime will, when it polls a host future,
318/// configure ambient state such that the `Accessor` that a future closes over
319/// will work and be able to access the store.
320///
321/// This has a number of implications for users such as:
322///
323/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
324///   the lifetime of a single future.
325/// * A future is expected to, however, close over an `Accessor` and keep it
326///   alive probably for the duration of the entire future.
327/// * Different host futures will be given different `Accessor`s, and that's
328///   intentional.
329/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
330///   alleviates some otherwise required bounds to be written down.
331///
332/// # Using `Accessor` in `Drop`
333///
334/// The methods on `Accessor` are only expected to work in the context of
335/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
336/// host future can be dropped at any time throughout the system and Wasmtime
337/// store context is not necessarily available at that time. It's recommended to
338/// not use `Accessor` methods in anything connected to a `Drop` implementation
339/// as they will panic and have unintended results. If you run into this though
340/// feel free to file an issue on the Wasmtime repository.
341pub struct Accessor<T: 'static, D = HasSelf<T>>
342where
343    D: HasData + ?Sized,
344{
345    token: StoreToken<T>,
346    get_data: fn(&mut T) -> D::Data<'_>,
347}
348
349/// A helper trait to take any type of accessor-with-data in functions.
350///
351/// This trait is similar to [`AsContextMut`] except that it's used when
352/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
353/// [`Accessor`] is the main type used in concurrent settings and is passed to
354/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
355///
356/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
357/// this trait. This effectively means that regardless of the `D` in
358/// `Accessor<T, D>` it can still be passed to a function which just needs a
359/// store accessor.
360///
361/// Acquiring an [`Accessor`] can be done through
362/// [`StoreContextMut::run_concurrent`] for example or in a host function
363/// through
364/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
365pub trait AsAccessor {
366    /// The `T` in `Store<T>` that this accessor refers to.
367    type Data: 'static;
368
369    /// The `D` in `Accessor<T, D>`, or the projection out of
370    /// `Self::Data`.
371    type AccessorData: HasData + ?Sized;
372
373    /// Returns the accessor that this is referring to.
374    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
375}
376
377impl<T: AsAccessor + ?Sized> AsAccessor for &T {
378    type Data = T::Data;
379    type AccessorData = T::AccessorData;
380
381    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
382        T::as_accessor(self)
383    }
384}
385
386impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
387    type Data = T;
388    type AccessorData = D;
389
390    fn as_accessor(&self) -> &Accessor<T, D> {
391        self
392    }
393}
394
395// Note that it is intentional at this time that `Accessor` does not actually
396// store `&mut T` or anything similar. This distinctly enables the `Accessor`
397// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
398// that matter). This is used to ergonomically simplify bindings where the
399// majority of the time `Accessor` is closed over in a future which then needs
400// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
401// you already have to write `T: 'static`...) it helps to avoid this.
402//
403// Note as well that `Accessor` doesn't actually store its data at all. Instead
404// it's more of a "proof" of what can be accessed from TLS. API design around
405// `Accessor` and functions like `Linker::func_wrap_concurrent` are
406// intentionally made to ensure that `Accessor` is ideally only used in the
407// context that TLS variables are actually set. For example host functions are
408// given `&Accessor`, not `Accessor`, and this prevents them from persisting
409// the value outside of a future. Within the future the TLS variables are all
410// guaranteed to be set while the future is being polled.
411//
412// Finally though this is not an ironclad guarantee, but nor does it need to be.
413// The TLS APIs are designed to panic or otherwise model usage where they're
414// called recursively or similar. It's hoped that code cannot be constructed to
415// actually hit this at runtime but this is not a safety requirement at this
416// time.
417const _: () = {
418    const fn assert<T: Send + Sync>() {}
419    assert::<Accessor<UnsafeCell<u32>>>();
420};
421
422impl<T> Accessor<T> {
423    /// Creates a new `Accessor` backed by the specified functions.
424    ///
425    /// - `get`: used to retrieve the store
426    ///
427    /// - `get_data`: used to "project" from the store's associated data to
428    /// another type (e.g. a field of that data or a wrapper around it).
429    ///
430    /// - `spawn`: used to queue spawned background tasks to be run later
431    pub(crate) fn new(token: StoreToken<T>) -> Self {
432        Self {
433            token,
434            get_data: |x| x,
435        }
436    }
437}
438
439impl<T, D> Accessor<T, D>
440where
441    D: HasData + ?Sized,
442{
443    /// Run the specified closure, passing it mutable access to the store.
444    ///
445    /// This function is one of the main building blocks of the [`Accessor`]
446    /// type. This yields synchronous, blocking, access to the store via an
447    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
448    /// providing the ability to access `D` via [`Access::get`]. Note that the
449    /// `fun` here is given only temporary access to the store and `T`/`D`
450    /// meaning that the return value `R` here is not allowed to capture borrows
451    /// into the two. If access is needed to data within `T` or `D` outside of
452    /// this closure then it must be `clone`d out, for example.
453    ///
454    /// # Panics
455    ///
456    /// This function will panic if it is call recursively with any other
457    /// accessor already in scope. For example if `with` is called within `fun`,
458    /// then this function will panic. It is up to the embedder to ensure that
459    /// this does not happen.
460    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
461        tls::get(|vmstore| {
462            fun(Access {
463                store: self.token.as_context_mut(vmstore),
464                get_data: self.get_data,
465            })
466        })
467    }
468
469    /// Returns the getter this accessor is using to project from `T` into
470    /// `D::Data`.
471    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
472        self.get_data
473    }
474
475    /// Changes this accessor to access `D2` instead of the current type
476    /// parameter `D`.
477    ///
478    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
479    ///
480    /// # Panics
481    ///
482    /// When using this API the returned value is disconnected from `&self` and
483    /// the lifetime binding the `self` argument. An `Accessor` only works
484    /// within the context of the closure or async closure that it was
485    /// originally given to, however. This means that due to the fact that the
486    /// returned value has no lifetime connection it's possible to use the
487    /// accessor outside of `&self`, the original accessor, and panic.
488    ///
489    /// The returned value should only be used within the scope of the original
490    /// `Accessor` that `self` refers to.
491    pub fn with_getter<D2: HasData>(
492        &self,
493        get_data: fn(&mut T) -> D2::Data<'_>,
494    ) -> Accessor<T, D2> {
495        Accessor {
496            token: self.token,
497            get_data,
498        }
499    }
500
501    /// Spawn a background task which will receive an `&Accessor<T, D>` and
502    /// run concurrently with any other tasks in progress for the current
503    /// store.
504    ///
505    /// This is particularly useful for host functions which return a `stream`
506    /// or `future` such that the code to write to the write end of that
507    /// `stream` or `future` must run after the function returns.
508    ///
509    /// The returned [`JoinHandle`] may be used to cancel the task.
510    ///
511    /// # Panics
512    ///
513    /// Panics if called within a closure provided to the [`Accessor::with`]
514    /// function. This can only be called outside an active invocation of
515    /// [`Accessor::with`].
516    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
517    where
518        T: 'static,
519    {
520        let accessor = self.clone_for_spawn();
521        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
522    }
523
524    fn clone_for_spawn(&self) -> Self {
525        Self {
526            token: self.token,
527            get_data: self.get_data,
528        }
529    }
530}
531
532/// Represents a task which may be provided to `Accessor::spawn`,
533/// `Accessor::forward`, or `StorecContextMut::spawn`.
534// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
535// option.
536//
537// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
538// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
539// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
540// Send + Sync + 'static` fails with a type mismatch error when we try to pass
541// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
542// best we can do for the time being.
543pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
544where
545    D: HasData + ?Sized,
546{
547    /// Run the task.
548    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
549}
550
551/// Represents parameter and result metadata for the caller side of a
552/// guest->guest call orchestrated by a fused adapter.
553enum CallerInfo {
554    /// Metadata for a call to an async-lowered import
555    Async {
556        params: Vec<ValRaw>,
557        has_result: bool,
558    },
559    /// Metadata for a call to an sync-lowered import
560    Sync {
561        params: Vec<ValRaw>,
562        result_count: u32,
563    },
564}
565
566/// Indicates how a guest task is waiting on a waitable set.
567enum WaitMode {
568    /// The guest task is waiting using `task.wait`
569    Fiber(StoreFiber<'static>),
570    /// The guest task is waiting via a callback declared as part of an
571    /// async-lifted export.
572    Callback(Instance),
573}
574
575/// Represents the reason a fiber is suspending itself.
576#[derive(Debug)]
577enum SuspendReason {
578    /// The fiber is waiting for an event to be delivered to the specified
579    /// waitable set or task.
580    Waiting {
581        set: TableId<WaitableSet>,
582        thread: QualifiedThreadId,
583        skip_may_block_check: bool,
584    },
585    /// The fiber has finished handling its most recent work item and is waiting
586    /// for another (or to be dropped if it is no longer needed).
587    NeedWork,
588    /// The fiber is yielding and should be resumed once other tasks have had a
589    /// chance to run.
590    Yielding {
591        thread: QualifiedThreadId,
592        skip_may_block_check: bool,
593    },
594    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
595    ExplicitlySuspending {
596        thread: QualifiedThreadId,
597        skip_may_block_check: bool,
598    },
599}
600
601/// Represents a pending call into guest code for a given guest task.
602enum GuestCallKind {
603    /// Indicates there's an event to deliver to the task, possibly related to a
604    /// waitable set the task has been waiting on or polling.
605    DeliverEvent {
606        /// The instance to which the task belongs.
607        instance: Instance,
608        /// The waitable set the event belongs to, if any.
609        ///
610        /// If this is `None` the event will be waiting in the
611        /// `GuestTask::event` field for the task.
612        set: Option<TableId<WaitableSet>>,
613    },
614    /// Indicates that a new guest task call is pending and may be executed
615    /// using the specified closure.
616    ///
617    /// If the closure returns `Ok(Some(call))`, the `call` should be run
618    /// immediately using `handle_guest_call`.
619    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
620    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
621}
622
623impl fmt::Debug for GuestCallKind {
624    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
625        match self {
626            Self::DeliverEvent { instance, set } => f
627                .debug_struct("DeliverEvent")
628                .field("instance", instance)
629                .field("set", set)
630                .finish(),
631            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
632            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
633        }
634    }
635}
636
637/// Represents a pending call into guest code for a given guest thread.
638#[derive(Debug)]
639struct GuestCall {
640    thread: QualifiedThreadId,
641    kind: GuestCallKind,
642}
643
644impl GuestCall {
645    /// Returns whether or not the call is ready to run.
646    ///
647    /// A call will not be ready to run if either:
648    ///
649    /// - the (sub-)component instance to be called has already been entered and
650    /// cannot be reentered until an in-progress call completes
651    ///
652    /// - the call is for a not-yet started task and the (sub-)component
653    /// instance to be called has backpressure enabled
654    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
655        let instance = store
656            .concurrent_state_mut()
657            .get_mut(self.thread.task)?
658            .instance;
659        let state = store.instance_state(instance);
660
661        let ready = match &self.kind {
662            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
663            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
664            GuestCallKind::StartExplicit(_) => true,
665        };
666        log::trace!(
667            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
668            state.do_not_enter,
669            state.backpressure
670        );
671        Ok(ready)
672    }
673}
674
675/// Job to be run on a worker fiber.
676enum WorkerItem {
677    GuestCall(GuestCall),
678    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
679}
680
681/// Represents a pending work item to be handled by the event loop for a given
682/// component instance.
683enum WorkItem {
684    /// A host task to be pushed to `ConcurrentState::futures`.
685    PushFuture(AlwaysMut<HostTaskFuture>),
686    /// A fiber to resume.
687    ResumeFiber(StoreFiber<'static>),
688    /// A pending call into guest code for a given guest task.
689    GuestCall(GuestCall),
690    /// A job to run on a worker fiber.
691    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
692}
693
694impl fmt::Debug for WorkItem {
695    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
696        match self {
697            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
698            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
699            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
700            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
701        }
702    }
703}
704
705/// Whether a suspension intrinsic was cancelled or completed
706#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
707pub(crate) enum WaitResult {
708    Cancelled,
709    Completed,
710}
711
712/// Poll the specified future until it completes on behalf of a guest->host call
713/// using a sync-lowered import.
714///
715/// This is similar to `Instance::first_poll` except it's for sync-lowered
716/// imports, meaning we don't need to handle cancellation and we can block the
717/// caller until the task completes, at which point the caller can handle
718/// lowering the result to the guest's stack and linear memory.
719pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
720    store: &mut dyn VMStore,
721    future: impl Future<Output = Result<R>> + Send + 'static,
722    caller_instance: RuntimeInstance,
723) -> Result<R> {
724    store.check_may_leave(caller_instance)?;
725
726    let state = store.concurrent_state_mut();
727
728    let caller = state.guest_thread.unwrap();
729
730    // Save any existing result stashed in `GuestTask::result` so we can replace
731    // it with the new result.
732    let old_result = state
733        .get_mut(caller.task)
734        .with_context(|| format!("bad handle: {caller:?}"))?
735        .result
736        .take();
737
738    // Add a temporary host task into the table so we can track its progress.
739    // Note that we'll never allocate a waitable handle for the guest since
740    // we're being called synchronously.
741    let task = state.push(HostTask::new(caller_instance, None))?;
742
743    log::trace!("new host task child of {caller:?}: {task:?}");
744
745    // Wrap the future in a closure which will take care of stashing the result
746    // in `GuestTask::result` and resuming this fiber when the host task
747    // completes.
748    let mut future = Box::pin(async move {
749        let result = future.await?;
750        tls::get(move |store| {
751            let state = store.concurrent_state_mut();
752            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
753
754            Waitable::Host(task).set_event(
755                state,
756                Some(Event::Subtask {
757                    status: Status::Returned,
758                }),
759            )?;
760
761            Ok(())
762        })
763    }) as HostTaskFuture;
764
765    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
766    // add the future to `ConcurrentState::futures` and poll it automatically
767    // from the event loop if it doesn't complete immediately here.
768    let poll = tls::set(store, || {
769        future
770            .as_mut()
771            .poll(&mut Context::from_waker(&Waker::noop()))
772    });
773
774    match poll {
775        Poll::Ready(result) => {
776            // It completed immediately; check the result and delete the task.
777            result?;
778            log::trace!("delete host task {task:?} (already ready)");
779            store.concurrent_state_mut().delete(task)?;
780        }
781        Poll::Pending => {
782            // It did not complete immediately; add it to
783            // `ConcurrentState::futures` so it will be polled via the event
784            // loop; then use `GuestTask::sync_call_set` to wait for the task to
785            // complete, suspending the current fiber until it does so.
786            let state = store.concurrent_state_mut();
787            state.push_future(future);
788
789            let set = state.get_mut(caller.task)?.sync_call_set;
790            Waitable::Host(task).join(state, Some(set))?;
791
792            store.suspend(SuspendReason::Waiting {
793                set,
794                thread: caller,
795                skip_may_block_check: false,
796            })?;
797        }
798    }
799
800    // Retrieve and return the result.
801    Ok(*mem::replace(
802        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
803        old_result,
804    )
805    .unwrap()
806    .downcast()
807    .unwrap())
808}
809
810/// Execute the specified guest call.
811fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
812    let mut next = Some(call);
813    while let Some(call) = next.take() {
814        match call.kind {
815            GuestCallKind::DeliverEvent { instance, set } => {
816                let (event, waitable) = instance
817                    .get_event(store, call.thread.task, set, true)?
818                    .unwrap();
819                let state = store.concurrent_state_mut();
820                let task = state.get_mut(call.thread.task)?;
821                let runtime_instance = task.instance;
822                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
823
824                log::trace!(
825                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
826                    call.thread,
827                );
828
829                let old_thread = store.set_thread(Some(call.thread));
830                log::trace!(
831                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
832                    call.thread
833                );
834
835                store.maybe_push_call_context(call.thread.task)?;
836
837                store.enter_instance(runtime_instance);
838
839                let callback = store
840                    .concurrent_state_mut()
841                    .get_mut(call.thread.task)?
842                    .callback
843                    .take()
844                    .unwrap();
845
846                let code = callback(store, event, handle)?;
847
848                store
849                    .concurrent_state_mut()
850                    .get_mut(call.thread.task)?
851                    .callback = Some(callback);
852
853                store.exit_instance(runtime_instance)?;
854
855                store.maybe_pop_call_context(call.thread.task)?;
856
857                store.set_thread(old_thread);
858
859                next = instance.handle_callback_code(
860                    store,
861                    call.thread,
862                    runtime_instance.index,
863                    code,
864                )?;
865
866                log::trace!(
867                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
868                );
869            }
870            GuestCallKind::StartImplicit(fun) => {
871                next = fun(store)?;
872            }
873            GuestCallKind::StartExplicit(fun) => {
874                fun(store)?;
875            }
876        }
877    }
878
879    Ok(())
880}
881
882impl<T> Store<T> {
883    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
884    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
885    where
886        T: Send + 'static,
887    {
888        ensure!(
889            self.as_context().0.concurrency_support(),
890            "cannot use `run_concurrent` when Config::concurrency_support disabled",
891        );
892        self.as_context_mut().run_concurrent(fun).await
893    }
894
895    #[doc(hidden)]
896    pub fn assert_concurrent_state_empty(&mut self) {
897        self.as_context_mut().assert_concurrent_state_empty();
898    }
899
900    /// Convenience wrapper for [`StoreContextMut::spawn`].
901    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
902    where
903        T: 'static,
904    {
905        self.as_context_mut().spawn(task)
906    }
907}
908
909impl<T> StoreContextMut<'_, T> {
910    /// Assert that all the relevant tables and queues in the concurrent state
911    /// for this store are empty.
912    ///
913    /// This is for sanity checking in integration tests
914    /// (e.g. `component-async-tests`) that the relevant state has been cleared
915    /// after each test concludes.  This should help us catch leaks, e.g. guest
916    /// tasks which haven't been deleted despite having completed and having
917    /// been dropped by their supertasks.
918    #[doc(hidden)]
919    pub fn assert_concurrent_state_empty(self) {
920        let store = self.0;
921        store
922            .store_data_mut()
923            .components
924            .assert_instance_states_empty();
925        let state = store.concurrent_state_mut();
926        assert!(
927            state.table.get_mut().is_empty(),
928            "non-empty table: {:?}",
929            state.table.get_mut()
930        );
931        assert!(state.high_priority.is_empty());
932        assert!(state.low_priority.is_empty());
933        assert!(state.guest_thread.is_none());
934        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
935        assert!(state.global_error_context_ref_counts.is_empty());
936    }
937
938    /// Spawn a background task to run as part of this instance's event loop.
939    ///
940    /// The task will receive an `&Accessor<U>` and run concurrently with
941    /// any other tasks in progress for the instance.
942    ///
943    /// Note that the task will only make progress if and when the event loop
944    /// for this instance is run.
945    ///
946    /// The returned [`SpawnHandle`] may be used to cancel the task.
947    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
948    where
949        T: 'static,
950    {
951        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
952        self.spawn_with_accessor(accessor, task)
953    }
954
955    /// Internal implementation of `spawn` functions where a `store` is
956    /// available along with an `Accessor`.
957    fn spawn_with_accessor<D>(
958        self,
959        accessor: Accessor<T, D>,
960        task: impl AccessorTask<T, D>,
961    ) -> JoinHandle
962    where
963        T: 'static,
964        D: HasData + ?Sized,
965    {
966        // Create an "abortable future" here where internally the future will
967        // hook calls to poll and possibly spawn more background tasks on each
968        // iteration.
969        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
970        self.0
971            .concurrent_state_mut()
972            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
973        handle
974    }
975
976    /// Run the specified closure `fun` to completion as part of this store's
977    /// event loop.
978    ///
979    /// This will run `fun` as part of this store's event loop until it
980    /// yields a result.  `fun` is provided an [`Accessor`], which provides
981    /// controlled access to the store and its data.
982    ///
983    /// This function can be used to invoke [`Func::call_concurrent`] for
984    /// example within the async closure provided here.
985    ///
986    /// This function will unconditionally return an error if
987    /// [`Config::concurrency_support`] is disabled.
988    ///
989    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
990    ///
991    /// # Store-blocking behavior
992    ///
993    /// At this time there are certain situations in which the `Future` returned
994    /// by the `AsyncFnOnce` passed to this function will not be polled for an
995    /// extended period of time, despite one or more `Waker::wake` events having
996    /// occurred for the task to which it belongs.  This can manifest as the
997    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
998    /// the `Store` being held by e.g. a blocking host function, preventing the
999    /// `Future` from being polled. A canonical example of this is when the
1000    /// `fun` provided to this function attempts to set a timeout for an
1001    /// invocation of a wasm function. In this situation the async closure is
1002    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1003    /// to elapse. At this time this setup will not always work and the timeout
1004    /// may not reliably fire.
1005    ///
1006    /// This function will not block the current thread and as such is always
1007    /// suitable to run in an `async` context, but the current implementation of
1008    /// Wasmtime can lead to situations where a certain wasm computation is
1009    /// required to make progress the closure to make progress. This is an
1010    /// artifact of Wasmtime's historical implementation of `async` functions
1011    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1012    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1013    /// progress for a readiness notification of (b) to get delivered.
1014    ///
1015    /// This effectively means that it's not possible to reliably perform a
1016    /// "select" operation within the `fun` closure, which timeouts for example
1017    /// are based on. Fixing this requires some relatively major refactoring
1018    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1019    /// that is intended to be fixed one day. In the meantime it's recommended
1020    /// to apply timeouts or such to the entire `run_concurrent` call itself
1021    /// rather than internally.
1022    ///
1023    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1024    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1025    ///
1026    /// # Example
1027    ///
1028    /// ```
1029    /// # use {
1030    /// #   wasmtime::{
1031    /// #     error::{Result},
1032    /// #     component::{ Component, Linker, Resource, ResourceTable},
1033    /// #     Config, Engine, Store
1034    /// #   },
1035    /// # };
1036    /// #
1037    /// # struct MyResource(u32);
1038    /// # struct Ctx { table: ResourceTable }
1039    /// #
1040    /// # async fn foo() -> Result<()> {
1041    /// # let mut config = Config::new();
1042    /// # let engine = Engine::new(&config)?;
1043    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1044    /// # let mut linker = Linker::new(&engine);
1045    /// # let component = Component::new(&engine, "")?;
1046    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1047    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1048    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1049    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1050    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1051    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1052    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1053    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1054    ///    Ok(())
1055    /// }).await??;
1056    /// # Ok(())
1057    /// # }
1058    /// ```
1059    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1060    where
1061        T: Send + 'static,
1062    {
1063        ensure!(
1064            self.0.concurrency_support(),
1065            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1066        );
1067        self.do_run_concurrent(fun, false).await
1068    }
1069
1070    pub(super) async fn run_concurrent_trap_on_idle<R>(
1071        self,
1072        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1073    ) -> Result<R>
1074    where
1075        T: Send + 'static,
1076    {
1077        self.do_run_concurrent(fun, true).await
1078    }
1079
1080    async fn do_run_concurrent<R>(
1081        mut self,
1082        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1083        trap_on_idle: bool,
1084    ) -> Result<R>
1085    where
1086        T: Send + 'static,
1087    {
1088        debug_assert!(self.0.concurrency_support());
1089        check_recursive_run();
1090        let token = StoreToken::new(self.as_context_mut());
1091
1092        struct Dropper<'a, T: 'static, V> {
1093            store: StoreContextMut<'a, T>,
1094            value: ManuallyDrop<V>,
1095        }
1096
1097        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1098            fn drop(&mut self) {
1099                tls::set(self.store.0, || {
1100                    // SAFETY: Here we drop the value without moving it for the
1101                    // first and only time -- per the contract for `Drop::drop`,
1102                    // this code won't run again, and the `value` field will no
1103                    // longer be accessible.
1104                    unsafe { ManuallyDrop::drop(&mut self.value) }
1105                });
1106            }
1107        }
1108
1109        let accessor = &Accessor::new(token);
1110        let dropper = &mut Dropper {
1111            store: self,
1112            value: ManuallyDrop::new(fun(accessor)),
1113        };
1114        // SAFETY: We never move `dropper` nor its `value` field.
1115        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1116
1117        dropper
1118            .store
1119            .as_context_mut()
1120            .poll_until(future, trap_on_idle)
1121            .await
1122    }
1123
1124    /// Run this store's event loop.
1125    ///
1126    /// The returned future will resolve when the specified future completes or,
1127    /// if `trap_on_idle` is true, when the event loop can't make further
1128    /// progress.
1129    async fn poll_until<R>(
1130        mut self,
1131        mut future: Pin<&mut impl Future<Output = R>>,
1132        trap_on_idle: bool,
1133    ) -> Result<R>
1134    where
1135        T: Send + 'static,
1136    {
1137        struct Reset<'a, T: 'static> {
1138            store: StoreContextMut<'a, T>,
1139            futures: Option<FuturesUnordered<HostTaskFuture>>,
1140        }
1141
1142        impl<'a, T> Drop for Reset<'a, T> {
1143            fn drop(&mut self) {
1144                if let Some(futures) = self.futures.take() {
1145                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1146                }
1147            }
1148        }
1149
1150        loop {
1151            // Take `ConcurrentState::futures` out of the store so we can poll
1152            // it while also safely giving any of the futures inside access to
1153            // `self`.
1154            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1155            let mut reset = Reset {
1156                store: self.as_context_mut(),
1157                futures,
1158            };
1159            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1160
1161            enum PollResult<R> {
1162                Complete(R),
1163                ProcessWork(Vec<WorkItem>),
1164            }
1165            let result = future::poll_fn(|cx| {
1166                // First, poll the future we were passed as an argument and
1167                // return immediately if it's ready.
1168                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1169                    return Poll::Ready(Ok(PollResult::Complete(value)));
1170                }
1171
1172                // Next, poll `ConcurrentState::futures` (which includes any
1173                // pending host tasks and/or background tasks), returning
1174                // immediately if one of them fails.
1175                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1176                    Poll::Ready(Some(output)) => {
1177                        match output {
1178                            Err(e) => return Poll::Ready(Err(e)),
1179                            Ok(()) => {}
1180                        }
1181                        Poll::Ready(true)
1182                    }
1183                    Poll::Ready(None) => Poll::Ready(false),
1184                    Poll::Pending => Poll::Pending,
1185                };
1186
1187                // Next, collect the next batch of work items to process, if any.
1188                // This will be either all of the high-priority work items, or if
1189                // there are none, a single low-priority work item.
1190                let state = reset.store.0.concurrent_state_mut();
1191                let ready = state.collect_work_items_to_run();
1192                if !ready.is_empty() {
1193                    return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1194                }
1195
1196                // Finally, if we have nothing else to do right now, determine what to do
1197                // based on whether there are any pending futures in
1198                // `ConcurrentState::futures`.
1199                return match next {
1200                    Poll::Ready(true) => {
1201                        // In this case, one of the futures in
1202                        // `ConcurrentState::futures` completed
1203                        // successfully, so we return now and continue
1204                        // the outer loop in case there is another one
1205                        // ready to complete.
1206                        Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1207                    }
1208                    Poll::Ready(false) => {
1209                        // Poll the future we were passed one last time
1210                        // in case one of `ConcurrentState::futures` had
1211                        // the side effect of unblocking it.
1212                        if let Poll::Ready(value) =
1213                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1214                        {
1215                            Poll::Ready(Ok(PollResult::Complete(value)))
1216                        } else {
1217                            // In this case, there are no more pending
1218                            // futures in `ConcurrentState::futures`,
1219                            // there are no remaining work items, _and_
1220                            // the future we were passed as an argument
1221                            // still hasn't completed.
1222                            if trap_on_idle {
1223                                // `trap_on_idle` is true, so we exit
1224                                // immediately.
1225                                Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1226                            } else {
1227                                // `trap_on_idle` is false, so we assume
1228                                // that future will wake up and give us
1229                                // more work to do when it's ready to.
1230                                Poll::Pending
1231                            }
1232                        }
1233                    }
1234                    // There is at least one pending future in
1235                    // `ConcurrentState::futures` and we have nothing
1236                    // else to do but wait for now, so we return
1237                    // `Pending`.
1238                    Poll::Pending => Poll::Pending,
1239                };
1240            })
1241            .await;
1242
1243            // Put the `ConcurrentState::futures` back into the store before we
1244            // return or handle any work items since one or more of those items
1245            // might append more futures.
1246            drop(reset);
1247
1248            match result? {
1249                // The future we were passed as an argument completed, so we
1250                // return the result.
1251                PollResult::Complete(value) => break Ok(value),
1252                // The future we were passed has not yet completed, so handle
1253                // any work items and then loop again.
1254                PollResult::ProcessWork(ready) => {
1255                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1256                        store: StoreContextMut<'a, T>,
1257                        ready: I,
1258                    }
1259
1260                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1261                        fn drop(&mut self) {
1262                            while let Some(item) = self.ready.next() {
1263                                match item {
1264                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1265                                    WorkItem::PushFuture(future) => {
1266                                        tls::set(self.store.0, move || drop(future))
1267                                    }
1268                                    _ => {}
1269                                }
1270                            }
1271                        }
1272                    }
1273
1274                    let mut dispose = Dispose {
1275                        store: self.as_context_mut(),
1276                        ready: ready.into_iter(),
1277                    };
1278
1279                    while let Some(item) = dispose.ready.next() {
1280                        dispose
1281                            .store
1282                            .as_context_mut()
1283                            .handle_work_item(item)
1284                            .await?;
1285                    }
1286                }
1287            }
1288        }
1289    }
1290
1291    /// Handle the specified work item, possibly resuming a fiber if applicable.
1292    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1293    where
1294        T: Send,
1295    {
1296        log::trace!("handle work item {item:?}");
1297        match item {
1298            WorkItem::PushFuture(future) => {
1299                self.0
1300                    .concurrent_state_mut()
1301                    .futures
1302                    .get_mut()
1303                    .as_mut()
1304                    .unwrap()
1305                    .push(future.into_inner());
1306            }
1307            WorkItem::ResumeFiber(fiber) => {
1308                self.0.resume_fiber(fiber).await?;
1309            }
1310            WorkItem::GuestCall(call) => {
1311                if call.is_ready(self.0)? {
1312                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1313                } else {
1314                    let state = self.0.concurrent_state_mut();
1315                    let task = state.get_mut(call.thread.task)?;
1316                    if !task.starting_sent {
1317                        task.starting_sent = true;
1318                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1319                            Waitable::Guest(call.thread.task).set_event(
1320                                state,
1321                                Some(Event::Subtask {
1322                                    status: Status::Starting,
1323                                }),
1324                            )?;
1325                        }
1326                    }
1327
1328                    let instance = state.get_mut(call.thread.task)?.instance;
1329                    self.0
1330                        .instance_state(instance)
1331                        .pending
1332                        .insert(call.thread, call.kind);
1333                }
1334            }
1335            WorkItem::WorkerFunction(fun) => {
1336                self.run_on_worker(WorkerItem::Function(fun)).await?;
1337            }
1338        }
1339
1340        Ok(())
1341    }
1342
1343    /// Execute the specified guest call on a worker fiber.
1344    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1345    where
1346        T: Send,
1347    {
1348        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1349            fiber
1350        } else {
1351            fiber::make_fiber(self.0, move |store| {
1352                loop {
1353                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1354                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1355                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1356                    }
1357
1358                    store.suspend(SuspendReason::NeedWork)?;
1359                }
1360            })?
1361        };
1362
1363        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1364        assert!(worker_item.is_none());
1365        *worker_item = Some(item);
1366
1367        self.0.resume_fiber(worker).await
1368    }
1369
1370    /// Wrap the specified host function in a future which will call it, passing
1371    /// it an `&Accessor<T>`.
1372    ///
1373    /// See the `Accessor` documentation for details.
1374    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1375    where
1376        T: 'static,
1377        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1378            + Send
1379            + Sync
1380            + 'static,
1381        R: Send + Sync + 'static,
1382    {
1383        let token = StoreToken::new(self);
1384        async move {
1385            let mut accessor = Accessor::new(token);
1386            closure(&mut accessor).await
1387        }
1388    }
1389}
1390
1391impl StoreOpaque {
1392    fn check_may_leave(&mut self, instance: RuntimeInstance) -> Result<()> {
1393        Instance::from_wasmtime(self, instance.instance)
1394            .id()
1395            .get(self)
1396            .check_may_leave(instance.index)?;
1397
1398        // While we're here, verify that the caller instance matches the most
1399        // recent task pushed onto the task stack:
1400        let state = self.concurrent_state_mut();
1401        let caller = state.guest_thread.unwrap();
1402        assert_eq!(state.get_mut(caller.task)?.instance, instance);
1403
1404        Ok(())
1405    }
1406
1407    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1408    /// guest-to-guest call or a sync host-to-guest call.
1409    ///
1410    /// This task will only be used for the purpose of handling calls to
1411    /// intrinsic functions; both parameter lowering and result lifting are
1412    /// assumed to be taken care of elsewhere.
1413    pub(crate) fn enter_sync_call(
1414        &mut self,
1415        guest_caller: Option<RuntimeInstance>,
1416        callee_async: bool,
1417        callee: RuntimeInstance,
1418    ) -> Result<()> {
1419        log::trace!("enter sync call {callee:?}");
1420
1421        let state = self.concurrent_state_mut();
1422        let thread = state.guest_thread;
1423        let instance = if let Some(thread) = thread {
1424            Some(state.get_mut(thread.task)?.instance)
1425        } else {
1426            None
1427        };
1428        let task = GuestTask::new(
1429            state,
1430            Box::new(move |_, _| unreachable!()),
1431            LiftResult {
1432                lift: Box::new(move |_, _| unreachable!()),
1433                ty: TypeTupleIndex::reserved_value(),
1434                memory: None,
1435                string_encoding: StringEncoding::Utf8,
1436            },
1437            if let Some(caller) = guest_caller {
1438                assert_eq!(caller, instance.unwrap());
1439                Caller::Guest {
1440                    thread: thread.unwrap(),
1441                }
1442            } else {
1443                Caller::Host {
1444                    tx: None,
1445                    exit_tx: Arc::new(oneshot::channel().0),
1446                    host_future_present: false,
1447                    call_post_return_automatically: false,
1448                    caller: state.guest_thread,
1449                }
1450            },
1451            None,
1452            callee,
1453            callee_async,
1454        )?;
1455
1456        let guest_task = state.push(task)?;
1457        let new_thread = GuestThread::new_implicit(guest_task);
1458        let guest_thread = state.push(new_thread)?;
1459        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1460            guest_thread,
1461            self,
1462            callee.index,
1463        )?;
1464
1465        let state = self.concurrent_state_mut();
1466        state.get_mut(guest_task)?.threads.insert(guest_thread);
1467        if guest_caller.is_some() {
1468            let thread = state.guest_thread.unwrap();
1469            state.get_mut(thread.task)?.subtasks.insert(guest_task);
1470        }
1471
1472        self.set_thread(Some(QualifiedThreadId {
1473            task: guest_task,
1474            thread: guest_thread,
1475        }));
1476
1477        Ok(())
1478    }
1479
1480    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1481    pub(crate) fn exit_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1482        let thread = self.set_thread(None).unwrap();
1483        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1484        log::trace!("exit sync call {instance:?}");
1485        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1486            self,
1487            thread,
1488            instance.index,
1489        )?;
1490
1491        let state = self.concurrent_state_mut();
1492        let task = state.get_mut(thread.task)?;
1493        let caller = match &task.caller {
1494            &Caller::Guest { thread } => {
1495                assert!(guest_caller);
1496                Some(thread)
1497            }
1498            &Caller::Host { caller, .. } => {
1499                assert!(!guest_caller);
1500                caller
1501            }
1502        };
1503        self.set_thread(caller);
1504
1505        let state = self.concurrent_state_mut();
1506        let task = state.get_mut(thread.task)?;
1507        if task.ready_to_delete() {
1508            state.delete(thread.task)?.dispose(state, thread.task)?;
1509        }
1510
1511        Ok(())
1512    }
1513
1514    /// Determine whether the specified instance may be entered from the host.
1515    ///
1516    /// We return `true` here only if all of the following hold:
1517    ///
1518    /// - The top-level instance is not already on the current task's call stack.
1519    /// - The instance is not in need of a post-return function call.
1520    /// - `self` has not been poisoned due to a trap.
1521    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1522        if !self.concurrency_support() {
1523            return self.may_enter_at_all(instance);
1524        }
1525        let state = self.concurrent_state_mut();
1526        if let Some(caller) = state.guest_thread {
1527            instance != state.get_mut(caller.task).unwrap().instance
1528                && self.may_enter_from_caller(caller.task, instance)
1529        } else {
1530            self.may_enter_at_all(instance)
1531        }
1532    }
1533
1534    /// Variation of `may_enter` which takes a `TableId<GuestTask>` representing
1535    /// the callee.
1536    fn may_enter_task(&mut self, task: TableId<GuestTask>) -> bool {
1537        let instance = self.concurrent_state_mut().get_mut(task).unwrap().instance;
1538        self.may_enter_from_caller(task, instance)
1539    }
1540
1541    /// Variation of `may_enter` which takes a `TableId<GuestTask>` representing
1542    /// the caller, plus a `RuntimeInstance` representing the callee.
1543    fn may_enter_from_caller(
1544        &mut self,
1545        mut guest_task: TableId<GuestTask>,
1546        instance: RuntimeInstance,
1547    ) -> bool {
1548        self.may_enter_at_all(instance) && {
1549            let state = self.concurrent_state_mut();
1550            let guest_instance = instance.instance;
1551            loop {
1552                // Note that we only compare top-level instance IDs here.  The
1553                // idea is that the host is not allowed to recursively enter a
1554                // top-level instance even if the specific leaf instance is not
1555                // on the stack.  This the behavior defined in the spec, and it
1556                // allows us to elide runtime checks in guest-to-guest adapters.
1557                let next_thread = match &state.get_mut(guest_task).unwrap().caller {
1558                    Caller::Host { caller: None, .. } => break true,
1559                    &Caller::Host {
1560                        caller: Some(caller),
1561                        ..
1562                    } => {
1563                        let instance = state.get_mut(caller.task).unwrap().instance;
1564                        if instance.instance == guest_instance {
1565                            break false;
1566                        } else {
1567                            caller
1568                        }
1569                    }
1570                    &Caller::Guest { thread } => {
1571                        if state.get_mut(thread.task).unwrap().instance.instance == guest_instance {
1572                            break false;
1573                        } else {
1574                            thread
1575                        }
1576                    }
1577                };
1578                guest_task = next_thread.task;
1579            }
1580        }
1581    }
1582
1583    /// Helper function to retrieve the `ConcurrentInstanceState` for the
1584    /// specified instance.
1585    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1586        self.component_instance_mut(instance.instance)
1587            .instance_state(instance.index)
1588            .concurrent_state()
1589    }
1590
1591    /// Helper function to retrieve the `HandleTable` for the specified
1592    /// instance.
1593    fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1594        self.component_instance_mut(instance.instance)
1595            .instance_state(instance.index)
1596            .handle_table()
1597    }
1598
1599    fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1600        // Each time we switch threads, we conservatively set `task_may_block`
1601        // to `false` for the component instance we're switching away from (if
1602        // any), meaning it will be `false` for any new thread created for that
1603        // instance unless explicitly set otherwise.
1604        let state = self.concurrent_state_mut();
1605        let old_thread = state.guest_thread.take();
1606        if let Some(old_thread) = old_thread {
1607            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1608            self.component_instance_mut(instance)
1609                .set_task_may_block(false)
1610        }
1611
1612        self.concurrent_state_mut().guest_thread = thread;
1613
1614        // If we're switching to a new thread, set its component instance's
1615        // `task_may_block` according to where it left off.
1616        if thread.is_some() {
1617            self.set_task_may_block();
1618        }
1619
1620        old_thread
1621    }
1622
1623    /// Set the global variable representing whether the current task may block
1624    /// prior to entering Wasm code.
1625    fn set_task_may_block(&mut self) {
1626        let state = self.concurrent_state_mut();
1627        let guest_thread = state.guest_thread.unwrap();
1628        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1629        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1630        self.component_instance_mut(instance)
1631            .set_task_may_block(may_block)
1632    }
1633
1634    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1635        if !self.concurrency_support() {
1636            return Ok(());
1637        }
1638        let state = self.concurrent_state_mut();
1639        let task = state.guest_thread.unwrap().task;
1640        let instance = state.get_mut(task).unwrap().instance.instance;
1641        let task_may_block = self.component_instance(instance).get_task_may_block();
1642
1643        if task_may_block {
1644            Ok(())
1645        } else {
1646            Err(Trap::CannotBlockSyncTask.into())
1647        }
1648    }
1649
1650    /// Record that we're about to enter a (sub-)component instance which does
1651    /// not support more than one concurrent, stackful activation, meaning it
1652    /// cannot be entered again until the next call returns.
1653    fn enter_instance(&mut self, instance: RuntimeInstance) {
1654        log::trace!("enter {instance:?}");
1655        self.instance_state(instance).do_not_enter = true;
1656    }
1657
1658    /// Record that we've exited a (sub-)component instance previously entered
1659    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1660    /// See the documentation for the latter for details.
1661    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1662        log::trace!("exit {instance:?}");
1663        self.instance_state(instance).do_not_enter = false;
1664        self.partition_pending(instance)
1665    }
1666
1667    /// Iterate over `InstanceState::pending`, moving any ready items into the
1668    /// "high priority" work item queue.
1669    ///
1670    /// See `GuestCall::is_ready` for details.
1671    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1672        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1673            let call = GuestCall { thread, kind };
1674            if call.is_ready(self)? {
1675                self.concurrent_state_mut()
1676                    .push_high_priority(WorkItem::GuestCall(call));
1677            } else {
1678                self.instance_state(instance)
1679                    .pending
1680                    .insert(call.thread, call.kind);
1681            }
1682        }
1683
1684        Ok(())
1685    }
1686
1687    /// Implements the `backpressure.{inc,dec}` intrinsics.
1688    pub(crate) fn backpressure_modify(
1689        &mut self,
1690        caller_instance: RuntimeInstance,
1691        modify: impl FnOnce(u16) -> Option<u16>,
1692    ) -> Result<()> {
1693        let state = self.instance_state(caller_instance);
1694        let old = state.backpressure;
1695        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1696        state.backpressure = new;
1697
1698        if old > 0 && new == 0 {
1699            // Backpressure was previously enabled and is now disabled; move any
1700            // newly-eligible guest calls to the "high priority" queue.
1701            self.partition_pending(caller_instance)?;
1702        }
1703
1704        Ok(())
1705    }
1706
1707    /// Resume the specified fiber, giving it exclusive access to the specified
1708    /// store.
1709    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1710        let old_thread = self.concurrent_state_mut().guest_thread;
1711        log::trace!("resume_fiber: save current thread {old_thread:?}");
1712
1713        let fiber = fiber::resolve_or_release(self, fiber).await?;
1714
1715        self.set_thread(old_thread);
1716
1717        let state = self.concurrent_state_mut();
1718
1719        if let Some(ref ot) = old_thread {
1720            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1721        }
1722        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1723
1724        if let Some(mut fiber) = fiber {
1725            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1726            // See the `SuspendReason` documentation for what each case means.
1727            match state.suspend_reason.take().unwrap() {
1728                SuspendReason::NeedWork => {
1729                    if state.worker.is_none() {
1730                        state.worker = Some(fiber);
1731                    } else {
1732                        fiber.dispose(self);
1733                    }
1734                }
1735                SuspendReason::Yielding { thread, .. } => {
1736                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1737                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1738                }
1739                SuspendReason::ExplicitlySuspending { thread, .. } => {
1740                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1741                }
1742                SuspendReason::Waiting { set, thread, .. } => {
1743                    let old = state
1744                        .get_mut(set)?
1745                        .waiting
1746                        .insert(thread, WaitMode::Fiber(fiber));
1747                    assert!(old.is_none());
1748                }
1749            };
1750        } else {
1751            log::trace!("resume_fiber: fiber has exited");
1752        }
1753
1754        Ok(())
1755    }
1756
1757    /// Suspend the current fiber, storing the reason in
1758    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1759    /// it should be resumed.
1760    ///
1761    /// See the `SuspendReason` documentation for details.
1762    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1763        log::trace!("suspend fiber: {reason:?}");
1764
1765        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1766        // pop the call context which manages resource borrows before suspending
1767        // and then push it again once we've resumed.
1768        let task = match &reason {
1769            SuspendReason::Yielding { thread, .. }
1770            | SuspendReason::Waiting { thread, .. }
1771            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1772            SuspendReason::NeedWork => None,
1773        };
1774
1775        let old_guest_thread = if let Some(task) = task {
1776            self.maybe_pop_call_context(task)?;
1777            self.concurrent_state_mut().guest_thread
1778        } else {
1779            None
1780        };
1781
1782        // We should not have reached here unless either there's no current
1783        // task, or the current task is permitted to block.  In addition, we
1784        // special-case `thread.switch-to` and waiting for a subtask to go from
1785        // `starting` to `started`, both of which we consider non-blocking
1786        // operations despite requiring a suspend.
1787        assert!(
1788            matches!(
1789                reason,
1790                SuspendReason::ExplicitlySuspending {
1791                    skip_may_block_check: true,
1792                    ..
1793                } | SuspendReason::Waiting {
1794                    skip_may_block_check: true,
1795                    ..
1796                } | SuspendReason::Yielding {
1797                    skip_may_block_check: true,
1798                    ..
1799                }
1800            ) || old_guest_thread
1801                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1802                .unwrap_or(true)
1803        );
1804
1805        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1806        assert!(suspend_reason.is_none());
1807        *suspend_reason = Some(reason);
1808
1809        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1810
1811        if let Some(task) = task {
1812            self.set_thread(old_guest_thread);
1813            self.maybe_push_call_context(task)?;
1814        }
1815
1816        Ok(())
1817    }
1818
1819    /// Push the call context for managing resource borrows for the specified
1820    /// guest task if it has not yet either returned a result or cancelled
1821    /// itself.
1822    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1823        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1824
1825        if !task.returned_or_cancelled() {
1826            log::trace!("push call context for {guest_task:?}");
1827            let call_context = task.call_context.take().unwrap();
1828            self.component_resource_state().0.push(call_context);
1829        }
1830        Ok(())
1831    }
1832
1833    /// Pop the call context for managing resource borrows for the specified
1834    /// guest task if it has not yet either returned a result or cancelled
1835    /// itself.
1836    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1837        if !self
1838            .concurrent_state_mut()
1839            .get_mut(guest_task)?
1840            .returned_or_cancelled()
1841        {
1842            log::trace!("pop call context for {guest_task:?}");
1843            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1844            self.concurrent_state_mut()
1845                .get_mut(guest_task)?
1846                .call_context = call_context;
1847        }
1848        Ok(())
1849    }
1850
1851    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1852        let state = self.concurrent_state_mut();
1853        let caller = state.guest_thread.unwrap();
1854        let old_set = waitable.common(state)?.set;
1855        let set = state.get_mut(caller.task)?.sync_call_set;
1856        waitable.join(state, Some(set))?;
1857        self.suspend(SuspendReason::Waiting {
1858            set,
1859            thread: caller,
1860            skip_may_block_check: false,
1861        })?;
1862        let state = self.concurrent_state_mut();
1863        waitable.join(state, old_set)
1864    }
1865}
1866
1867impl Instance {
1868    fn check_may_leave(
1869        self,
1870        store: &mut StoreOpaque,
1871        caller: RuntimeComponentInstanceIndex,
1872    ) -> Result<()> {
1873        store.check_may_leave(RuntimeInstance {
1874            instance: self.id().instance(),
1875            index: caller,
1876        })
1877    }
1878
1879    /// Get the next pending event for the specified task and (optional)
1880    /// waitable set, along with the waitable handle if applicable.
1881    fn get_event(
1882        self,
1883        store: &mut StoreOpaque,
1884        guest_task: TableId<GuestTask>,
1885        set: Option<TableId<WaitableSet>>,
1886        cancellable: bool,
1887    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1888        let state = store.concurrent_state_mut();
1889
1890        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1891            log::trace!("deliver event {event:?} to {guest_task:?}");
1892
1893            if cancellable || !matches!(event, Event::Cancelled) {
1894                return Ok(Some((event, None)));
1895            } else {
1896                state.get_mut(guest_task)?.event = Some(event);
1897            }
1898        }
1899
1900        Ok(
1901            if let Some((set, waitable)) = set
1902                .and_then(|set| {
1903                    state
1904                        .get_mut(set)
1905                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1906                        .transpose()
1907                })
1908                .transpose()?
1909            {
1910                let common = waitable.common(state)?;
1911                let handle = common.handle.unwrap();
1912                let event = common.event.take().unwrap();
1913
1914                log::trace!(
1915                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1916                );
1917
1918                waitable.on_delivery(store, self, event);
1919
1920                Some((event, Some((waitable, handle))))
1921            } else {
1922                None
1923            },
1924        )
1925    }
1926
1927    /// Handle the `CallbackCode` returned from an async-lifted export or its
1928    /// callback.
1929    ///
1930    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1931    /// using `handle_guest_call`.
1932    fn handle_callback_code(
1933        self,
1934        store: &mut StoreOpaque,
1935        guest_thread: QualifiedThreadId,
1936        runtime_instance: RuntimeComponentInstanceIndex,
1937        code: u32,
1938    ) -> Result<Option<GuestCall>> {
1939        let (code, set) = unpack_callback_code(code);
1940
1941        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1942
1943        let state = store.concurrent_state_mut();
1944
1945        let get_set = |store: &mut StoreOpaque, handle| {
1946            if handle == 0 {
1947                bail!("invalid waitable-set handle");
1948            }
1949
1950            let set = store
1951                .handle_table(RuntimeInstance {
1952                    instance: self.id().instance(),
1953                    index: runtime_instance,
1954                })
1955                .waitable_set_rep(handle)?;
1956
1957            Ok(TableId::<WaitableSet>::new(set))
1958        };
1959
1960        Ok(match code {
1961            callback_code::EXIT => {
1962                log::trace!("implicit thread {guest_thread:?} completed");
1963                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1964                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1965                if task.threads.is_empty() && !task.returned_or_cancelled() {
1966                    bail!(Trap::NoAsyncResult);
1967                }
1968                match &task.caller {
1969                    Caller::Host { .. } => {
1970                        if task.ready_to_delete() {
1971                            Waitable::Guest(guest_thread.task)
1972                                .delete_from(store.concurrent_state_mut())?;
1973                        }
1974                    }
1975                    Caller::Guest { .. } => {
1976                        task.exited = true;
1977                        task.callback = None;
1978                    }
1979                }
1980                None
1981            }
1982            callback_code::YIELD => {
1983                let task = state.get_mut(guest_thread.task)?;
1984                // If an `Event::Cancelled` is pending, we'll deliver that;
1985                // otherwise, we'll deliver `Event::None`.  Note that
1986                // `GuestTask::event` is only ever set to one of those two
1987                // `Event` variants.
1988                if let Some(event) = task.event {
1989                    assert!(matches!(event, Event::None | Event::Cancelled));
1990                } else {
1991                    task.event = Some(Event::None);
1992                }
1993                let call = GuestCall {
1994                    thread: guest_thread,
1995                    kind: GuestCallKind::DeliverEvent {
1996                        instance: self,
1997                        set: None,
1998                    },
1999                };
2000                if state.may_block(guest_thread.task) {
2001                    // Push this thread onto the "low priority" queue so it runs
2002                    // after any other threads have had a chance to run.
2003                    state.push_low_priority(WorkItem::GuestCall(call));
2004                    None
2005                } else {
2006                    // Yielding in a non-blocking context is defined as a no-op
2007                    // according to the spec, so we must run this thread
2008                    // immediately without allowing any others to run.
2009                    Some(call)
2010                }
2011            }
2012            callback_code::WAIT => {
2013                // The task may only return `WAIT` if it was created for a call
2014                // to an async export).  Otherwise, we'll trap.
2015                state.check_blocking_for(guest_thread.task)?;
2016
2017                let set = get_set(store, set)?;
2018                let state = store.concurrent_state_mut();
2019
2020                if state.get_mut(guest_thread.task)?.event.is_some()
2021                    || !state.get_mut(set)?.ready.is_empty()
2022                {
2023                    // An event is immediately available; deliver it ASAP.
2024                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
2025                        thread: guest_thread,
2026                        kind: GuestCallKind::DeliverEvent {
2027                            instance: self,
2028                            set: Some(set),
2029                        },
2030                    }));
2031                } else {
2032                    // No event is immediately available.
2033                    //
2034                    // We're waiting, so register to be woken up when an event
2035                    // is published for this waitable set.
2036                    //
2037                    // Here we also set `GuestTask::wake_on_cancel` which allows
2038                    // `subtask.cancel` to interrupt the wait.
2039                    let old = state
2040                        .get_mut(guest_thread.thread)?
2041                        .wake_on_cancel
2042                        .replace(set);
2043                    assert!(old.is_none());
2044                    let old = state
2045                        .get_mut(set)?
2046                        .waiting
2047                        .insert(guest_thread, WaitMode::Callback(self));
2048                    assert!(old.is_none());
2049                }
2050                None
2051            }
2052            _ => bail!("unsupported callback code: {code}"),
2053        })
2054    }
2055
2056    fn cleanup_thread(
2057        self,
2058        store: &mut StoreOpaque,
2059        guest_thread: QualifiedThreadId,
2060        runtime_instance: RuntimeComponentInstanceIndex,
2061    ) -> Result<()> {
2062        let guest_id = store
2063            .concurrent_state_mut()
2064            .get_mut(guest_thread.thread)?
2065            .instance_rep;
2066        store
2067            .handle_table(RuntimeInstance {
2068                instance: self.id().instance(),
2069                index: runtime_instance,
2070            })
2071            .guest_thread_remove(guest_id.unwrap())?;
2072
2073        store.concurrent_state_mut().delete(guest_thread.thread)?;
2074        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2075        task.threads.remove(&guest_thread.thread);
2076        Ok(())
2077    }
2078
2079    /// Add the specified guest call to the "high priority" work item queue, to
2080    /// be started as soon as backpressure and/or reentrance rules allow.
2081    ///
2082    /// SAFETY: The raw pointer arguments must be valid references to guest
2083    /// functions (with the appropriate signatures) when the closures queued by
2084    /// this function are called.
2085    unsafe fn queue_call<T: 'static>(
2086        self,
2087        mut store: StoreContextMut<T>,
2088        guest_thread: QualifiedThreadId,
2089        callee: SendSyncPtr<VMFuncRef>,
2090        param_count: usize,
2091        result_count: usize,
2092        async_: bool,
2093        callback: Option<SendSyncPtr<VMFuncRef>>,
2094        post_return: Option<SendSyncPtr<VMFuncRef>>,
2095    ) -> Result<()> {
2096        /// Return a closure which will call the specified function in the scope
2097        /// of the specified task.
2098        ///
2099        /// This will use `GuestTask::lower_params` to lower the parameters, but
2100        /// will not lift the result; instead, it returns a
2101        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2102        /// any, may be lifted.  Note that an async-lifted export will have
2103        /// returned its result using the `task.return` intrinsic (or not
2104        /// returned a result at all, in the case of `task.cancel`), in which
2105        /// case the "result" of this call will either be a callback code or
2106        /// nothing.
2107        ///
2108        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2109        /// the returned closure is called.
2110        unsafe fn make_call<T: 'static>(
2111            store: StoreContextMut<T>,
2112            guest_thread: QualifiedThreadId,
2113            callee: SendSyncPtr<VMFuncRef>,
2114            param_count: usize,
2115            result_count: usize,
2116        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2117        + Send
2118        + Sync
2119        + 'static
2120        + use<T> {
2121            let token = StoreToken::new(store);
2122            move |store: &mut dyn VMStore| {
2123                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2124
2125                store
2126                    .concurrent_state_mut()
2127                    .get_mut(guest_thread.thread)?
2128                    .state = GuestThreadState::Running;
2129                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2130                let lower = task.lower_params.take().unwrap();
2131
2132                lower(store, &mut storage[..param_count])?;
2133
2134                let mut store = token.as_context_mut(store);
2135
2136                // SAFETY: Per the contract documented in `make_call's`
2137                // documentation, `callee` must be a valid pointer.
2138                unsafe {
2139                    crate::Func::call_unchecked_raw(
2140                        &mut store,
2141                        callee.as_non_null(),
2142                        NonNull::new(
2143                            &mut storage[..param_count.max(result_count)]
2144                                as *mut [MaybeUninit<ValRaw>] as _,
2145                        )
2146                        .unwrap(),
2147                    )?;
2148                }
2149
2150                Ok(storage)
2151            }
2152        }
2153
2154        // SAFETY: Per the contract described in this function documentation,
2155        // the `callee` pointer which `call` closes over must be valid when
2156        // called by the closure we queue below.
2157        let call = unsafe {
2158            make_call(
2159                store.as_context_mut(),
2160                guest_thread,
2161                callee,
2162                param_count,
2163                result_count,
2164            )
2165        };
2166
2167        let callee_instance = store
2168            .0
2169            .concurrent_state_mut()
2170            .get_mut(guest_thread.task)?
2171            .instance;
2172
2173        let fun = if callback.is_some() {
2174            assert!(async_);
2175
2176            Box::new(move |store: &mut dyn VMStore| {
2177                self.add_guest_thread_to_instance_table(
2178                    guest_thread.thread,
2179                    store,
2180                    callee_instance.index,
2181                )?;
2182                let old_thread = store.set_thread(Some(guest_thread));
2183                log::trace!(
2184                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2185                );
2186
2187                store.maybe_push_call_context(guest_thread.task)?;
2188
2189                store.enter_instance(callee_instance);
2190
2191                // SAFETY: See the documentation for `make_call` to review the
2192                // contract we must uphold for `call` here.
2193                //
2194                // Per the contract described in the `queue_call`
2195                // documentation, the `callee` pointer which `call` closes
2196                // over must be valid.
2197                let storage = call(store)?;
2198
2199                store.exit_instance(callee_instance)?;
2200
2201                store.maybe_pop_call_context(guest_thread.task)?;
2202
2203                store.set_thread(old_thread);
2204                let state = store.concurrent_state_mut();
2205                old_thread
2206                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2207                log::trace!("stackless call: restored {old_thread:?} as current thread");
2208
2209                // SAFETY: `wasmparser` will have validated that the callback
2210                // function returns a `i32` result.
2211                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2212
2213                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2214            })
2215                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2216        } else {
2217            let token = StoreToken::new(store.as_context_mut());
2218            Box::new(move |store: &mut dyn VMStore| {
2219                self.add_guest_thread_to_instance_table(
2220                    guest_thread.thread,
2221                    store,
2222                    callee_instance.index,
2223                )?;
2224                let old_thread = store.set_thread(Some(guest_thread));
2225                log::trace!(
2226                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2227                );
2228                let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2229
2230                store.maybe_push_call_context(guest_thread.task)?;
2231
2232                // Unless this is a callback-less (i.e. stackful)
2233                // async-lifted export, we need to record that the instance
2234                // cannot be entered until the call returns.
2235                if !async_ {
2236                    store.enter_instance(callee_instance);
2237                }
2238
2239                // SAFETY: See the documentation for `make_call` to review the
2240                // contract we must uphold for `call` here.
2241                //
2242                // Per the contract described in the `queue_call`
2243                // documentation, the `callee` pointer which `call` closes
2244                // over must be valid.
2245                let storage = call(store)?;
2246
2247                // This is a callback-less call, so the implicit thread has now completed
2248                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2249
2250                if async_ {
2251                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2252                    if task.threads.is_empty() && !task.returned_or_cancelled() {
2253                        bail!(Trap::NoAsyncResult);
2254                    }
2255                } else {
2256                    // This is a sync-lifted export, so now is when we lift the
2257                    // result, optionally call the post-return function, if any,
2258                    // and finally notify any current or future waiters that the
2259                    // subtask has returned.
2260
2261                    let lift = {
2262                        store.exit_instance(callee_instance)?;
2263
2264                        let state = store.concurrent_state_mut();
2265                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2266
2267                        state
2268                            .get_mut(guest_thread.task)?
2269                            .lift_result
2270                            .take()
2271                            .unwrap()
2272                    };
2273
2274                    // SAFETY: `result_count` represents the number of core Wasm
2275                    // results returned, per `wasmparser`.
2276                    let result = (lift.lift)(store, unsafe {
2277                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2278                            &storage[..result_count],
2279                        )
2280                    })?;
2281
2282                    let post_return_arg = match result_count {
2283                        0 => ValRaw::i32(0),
2284                        // SAFETY: `result_count` represents the number of
2285                        // core Wasm results returned, per `wasmparser`.
2286                        1 => unsafe { storage[0].assume_init() },
2287                        _ => unreachable!(),
2288                    };
2289
2290                    if store
2291                        .concurrent_state_mut()
2292                        .get_mut(guest_thread.task)?
2293                        .call_post_return_automatically()
2294                    {
2295                        unsafe {
2296                            flags.set_may_leave(false);
2297                            flags.set_needs_post_return(false);
2298                        }
2299
2300                        if let Some(func) = post_return {
2301                            let mut store = token.as_context_mut(store);
2302
2303                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
2304                            // either `wasmtime-cranelift`-generated fused adapter
2305                            // code or `component::Options`.  Per `wasmparser`
2306                            // post-return signature validation, we know it takes a
2307                            // single parameter.
2308                            unsafe {
2309                                crate::Func::call_unchecked_raw(
2310                                    &mut store,
2311                                    func.as_non_null(),
2312                                    slice::from_ref(&post_return_arg).into(),
2313                                )?;
2314                            }
2315                        }
2316
2317                        unsafe {
2318                            flags.set_may_leave(true);
2319                        }
2320                    }
2321
2322                    self.task_complete(
2323                        store,
2324                        guest_thread.task,
2325                        result,
2326                        Status::Returned,
2327                        post_return_arg,
2328                    )?;
2329                }
2330
2331                store.set_thread(old_thread);
2332
2333                store.maybe_pop_call_context(guest_thread.task)?;
2334
2335                let state = store.concurrent_state_mut();
2336                let task = state.get_mut(guest_thread.task)?;
2337
2338                match &task.caller {
2339                    Caller::Host { .. } => {
2340                        if task.ready_to_delete() {
2341                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2342                        }
2343                    }
2344                    Caller::Guest { .. } => {
2345                        task.exited = true;
2346                    }
2347                }
2348
2349                Ok(None)
2350            })
2351        };
2352
2353        store
2354            .0
2355            .concurrent_state_mut()
2356            .push_high_priority(WorkItem::GuestCall(GuestCall {
2357                thread: guest_thread,
2358                kind: GuestCallKind::StartImplicit(fun),
2359            }));
2360
2361        Ok(())
2362    }
2363
2364    /// Prepare (but do not start) a guest->guest call.
2365    ///
2366    /// This is called from fused adapter code generated in
2367    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2368    /// are synthesized Wasm functions which move the parameters from the caller
2369    /// to the callee and the result from the callee to the caller,
2370    /// respectively.  The adapter will call `Self::start_call` immediately
2371    /// after calling this function.
2372    ///
2373    /// SAFETY: All the pointer arguments must be valid pointers to guest
2374    /// entities (and with the expected signatures for the function references
2375    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2376    unsafe fn prepare_call<T: 'static>(
2377        self,
2378        mut store: StoreContextMut<T>,
2379        start: *mut VMFuncRef,
2380        return_: *mut VMFuncRef,
2381        caller_instance: RuntimeComponentInstanceIndex,
2382        callee_instance: RuntimeComponentInstanceIndex,
2383        task_return_type: TypeTupleIndex,
2384        callee_async: bool,
2385        memory: *mut VMMemoryDefinition,
2386        string_encoding: u8,
2387        caller_info: CallerInfo,
2388    ) -> Result<()> {
2389        self.check_may_leave(store.0, caller_instance)?;
2390
2391        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2392            // A task may only call an async-typed function via a sync lower if
2393            // it was created by a call to an async export.  Otherwise, we'll
2394            // trap.
2395            store.0.check_blocking()?;
2396        }
2397
2398        enum ResultInfo {
2399            Heap { results: u32 },
2400            Stack { result_count: u32 },
2401        }
2402
2403        let result_info = match &caller_info {
2404            CallerInfo::Async {
2405                has_result: true,
2406                params,
2407            } => ResultInfo::Heap {
2408                results: params.last().unwrap().get_u32(),
2409            },
2410            CallerInfo::Async {
2411                has_result: false, ..
2412            } => ResultInfo::Stack { result_count: 0 },
2413            CallerInfo::Sync {
2414                result_count,
2415                params,
2416            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2417                results: params.last().unwrap().get_u32(),
2418            },
2419            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2420                result_count: *result_count,
2421            },
2422        };
2423
2424        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2425
2426        // Create a new guest task for the call, closing over the `start` and
2427        // `return_` functions to lift the parameters and lower the result,
2428        // respectively.
2429        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2430        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2431        let token = StoreToken::new(store.as_context_mut());
2432        let state = store.0.concurrent_state_mut();
2433        let old_thread = state.guest_thread.unwrap();
2434
2435        assert_eq!(
2436            state.get_mut(old_thread.task)?.instance,
2437            RuntimeInstance {
2438                instance: self.id().instance(),
2439                index: caller_instance,
2440            }
2441        );
2442
2443        let new_task = GuestTask::new(
2444            state,
2445            Box::new(move |store, dst| {
2446                let mut store = token.as_context_mut(store);
2447                assert!(dst.len() <= MAX_FLAT_PARAMS);
2448                // The `+ 1` here accounts for the return pointer, if any:
2449                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2450                let count = match caller_info {
2451                    // Async callers, if they have a result, use the last
2452                    // parameter as a return pointer so chop that off if
2453                    // relevant here.
2454                    CallerInfo::Async { params, has_result } => {
2455                        let params = &params[..params.len() - usize::from(has_result)];
2456                        for (param, src) in params.iter().zip(&mut src) {
2457                            src.write(*param);
2458                        }
2459                        params.len()
2460                    }
2461
2462                    // Sync callers forward everything directly.
2463                    CallerInfo::Sync { params, .. } => {
2464                        for (param, src) in params.iter().zip(&mut src) {
2465                            src.write(*param);
2466                        }
2467                        params.len()
2468                    }
2469                };
2470                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2471                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2472                // how it was constructed (see
2473                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2474                // for details) we know it takes count parameters and returns
2475                // `dst.len()` results.
2476                unsafe {
2477                    crate::Func::call_unchecked_raw(
2478                        &mut store,
2479                        start.as_non_null(),
2480                        NonNull::new(
2481                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2482                        )
2483                        .unwrap(),
2484                    )?;
2485                }
2486                dst.copy_from_slice(&src[..dst.len()]);
2487                let state = store.0.concurrent_state_mut();
2488                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2489                    state,
2490                    Some(Event::Subtask {
2491                        status: Status::Started,
2492                    }),
2493                )?;
2494                Ok(())
2495            }),
2496            LiftResult {
2497                lift: Box::new(move |store, src| {
2498                    // SAFETY: See comment in closure passed as `lower_params`
2499                    // parameter above.
2500                    let mut store = token.as_context_mut(store);
2501                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2502                    if let ResultInfo::Heap { results } = &result_info {
2503                        my_src.push(ValRaw::u32(*results));
2504                    }
2505                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2506                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2507                    // on how it was constructed (see
2508                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2509                    // for details) we know it takes `src.len()` parameters and
2510                    // returns up to 1 result.
2511                    unsafe {
2512                        crate::Func::call_unchecked_raw(
2513                            &mut store,
2514                            return_.as_non_null(),
2515                            my_src.as_mut_slice().into(),
2516                        )?;
2517                    }
2518                    let state = store.0.concurrent_state_mut();
2519                    let thread = state.guest_thread.unwrap();
2520                    if sync_caller {
2521                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2522                            if let ResultInfo::Stack { result_count } = &result_info {
2523                                match result_count {
2524                                    0 => None,
2525                                    1 => Some(my_src[0]),
2526                                    _ => unreachable!(),
2527                                }
2528                            } else {
2529                                None
2530                            },
2531                        );
2532                    }
2533                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2534                }),
2535                ty: task_return_type,
2536                memory: NonNull::new(memory).map(SendSyncPtr::new),
2537                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2538            },
2539            Caller::Guest { thread: old_thread },
2540            None,
2541            RuntimeInstance {
2542                instance: self.id().instance(),
2543                index: callee_instance,
2544            },
2545            callee_async,
2546        )?;
2547
2548        let guest_task = state.push(new_task)?;
2549        let new_thread = GuestThread::new_implicit(guest_task);
2550        let guest_thread = state.push(new_thread)?;
2551        state.get_mut(guest_task)?.threads.insert(guest_thread);
2552
2553        store
2554            .0
2555            .concurrent_state_mut()
2556            .get_mut(old_thread.task)?
2557            .subtasks
2558            .insert(guest_task);
2559
2560        // Make the new thread the current one so that `Self::start_call` knows
2561        // which one to start.
2562        store.0.set_thread(Some(QualifiedThreadId {
2563            task: guest_task,
2564            thread: guest_thread,
2565        }));
2566        log::trace!(
2567            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2568        );
2569
2570        Ok(())
2571    }
2572
2573    /// Call the specified callback function for an async-lifted export.
2574    ///
2575    /// SAFETY: `function` must be a valid reference to a guest function of the
2576    /// correct signature for a callback.
2577    unsafe fn call_callback<T>(
2578        self,
2579        mut store: StoreContextMut<T>,
2580        function: SendSyncPtr<VMFuncRef>,
2581        event: Event,
2582        handle: u32,
2583    ) -> Result<u32> {
2584        let (ordinal, result) = event.parts();
2585        let params = &mut [
2586            ValRaw::u32(ordinal),
2587            ValRaw::u32(handle),
2588            ValRaw::u32(result),
2589        ];
2590        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2591        // `wasmtime-cranelift`-generated fused adapter code or
2592        // `component::Options`.  Per `wasmparser` callback signature
2593        // validation, we know it takes three parameters and returns one.
2594        unsafe {
2595            crate::Func::call_unchecked_raw(
2596                &mut store,
2597                function.as_non_null(),
2598                params.as_mut_slice().into(),
2599            )?;
2600        }
2601        Ok(params[0].get_u32())
2602    }
2603
2604    /// Start a guest->guest call previously prepared using
2605    /// `Self::prepare_call`.
2606    ///
2607    /// This is called from fused adapter code generated in
2608    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2609    /// this function immediately after calling `Self::prepare_call`.
2610    ///
2611    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2612    /// functions with the appropriate signatures for the current guest task.
2613    /// If this is a call to an async-lowered import, the actual call may be
2614    /// deferred and run after this function returns, in which case the pointer
2615    /// arguments must also be valid when the call happens.
2616    unsafe fn start_call<T: 'static>(
2617        self,
2618        mut store: StoreContextMut<T>,
2619        callback: *mut VMFuncRef,
2620        post_return: *mut VMFuncRef,
2621        callee: *mut VMFuncRef,
2622        param_count: u32,
2623        result_count: u32,
2624        flags: u32,
2625        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2626    ) -> Result<u32> {
2627        let token = StoreToken::new(store.as_context_mut());
2628        let async_caller = storage.is_none();
2629        let state = store.0.concurrent_state_mut();
2630        let guest_thread = state.guest_thread.unwrap();
2631        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2632        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2633        let param_count = usize::try_from(param_count).unwrap();
2634        assert!(param_count <= MAX_FLAT_PARAMS);
2635        let result_count = usize::try_from(result_count).unwrap();
2636        assert!(result_count <= MAX_FLAT_RESULTS);
2637
2638        let task = state.get_mut(guest_thread.task)?;
2639        if !callback.is_null() {
2640            // We're calling an async-lifted export with a callback, so store
2641            // the callback and related context as part of the task so we can
2642            // call it later when needed.
2643            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2644            task.callback = Some(Box::new(move |store, event, handle| {
2645                let store = token.as_context_mut(store);
2646                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2647            }));
2648        }
2649
2650        let Caller::Guest { thread: caller } = &task.caller else {
2651            // As of this writing, `start_call` is only used for guest->guest
2652            // calls.
2653            unreachable!()
2654        };
2655        let caller = *caller;
2656        let caller_instance = state.get_mut(caller.task)?.instance;
2657
2658        // Queue the call as a "high priority" work item.
2659        unsafe {
2660            self.queue_call(
2661                store.as_context_mut(),
2662                guest_thread,
2663                callee,
2664                param_count,
2665                result_count,
2666                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2667                NonNull::new(callback).map(SendSyncPtr::new),
2668                NonNull::new(post_return).map(SendSyncPtr::new),
2669            )?;
2670        }
2671
2672        let state = store.0.concurrent_state_mut();
2673
2674        // Use the caller's `GuestTask::sync_call_set` to register interest in
2675        // the subtask...
2676        let guest_waitable = Waitable::Guest(guest_thread.task);
2677        let old_set = guest_waitable.common(state)?.set;
2678        let set = state.get_mut(caller.task)?.sync_call_set;
2679        guest_waitable.join(state, Some(set))?;
2680
2681        // ... and suspend this fiber temporarily while we wait for it to start.
2682        //
2683        // Note that we _could_ call the callee directly using the current fiber
2684        // rather than suspend this one, but that would make reasoning about the
2685        // event loop more complicated and is probably only worth doing if
2686        // there's a measurable performance benefit.  In addition, it would mean
2687        // blocking the caller if the callee calls a blocking sync-lowered
2688        // import, and as of this writing the spec says we must not do that.
2689        //
2690        // Alternatively, the fused adapter code could be modified to call the
2691        // callee directly without calling a host-provided intrinsic at all (in
2692        // which case it would need to do its own, inline backpressure checks,
2693        // etc.).  Again, we'd want to see a measurable performance benefit
2694        // before committing to such an optimization.  And again, we'd need to
2695        // update the spec to allow that.
2696        let (status, waitable) = loop {
2697            store.0.suspend(SuspendReason::Waiting {
2698                set,
2699                thread: caller,
2700                // Normally, `StoreOpaque::suspend` would assert it's being
2701                // called from a context where blocking is allowed.  However, if
2702                // `async_caller` is `true`, we'll only "block" long enough for
2703                // the callee to start, i.e. we won't repeat this loop, so we
2704                // tell `suspend` it's okay even if we're not allowed to block.
2705                // Alternatively, if the callee is not an async function, then
2706                // we know it won't block anyway.
2707                skip_may_block_check: async_caller || !callee_async,
2708            })?;
2709
2710            let state = store.0.concurrent_state_mut();
2711
2712            log::trace!("taking event for {:?}", guest_thread.task);
2713            let event = guest_waitable.take_event(state)?;
2714            let Some(Event::Subtask { status }) = event else {
2715                unreachable!();
2716            };
2717
2718            log::trace!("status {status:?} for {:?}", guest_thread.task);
2719
2720            if status == Status::Returned {
2721                // It returned, so we can stop waiting.
2722                break (status, None);
2723            } else if async_caller {
2724                // It hasn't returned yet, but the caller is calling via an
2725                // async-lowered import, so we generate a handle for the task
2726                // waitable and return the status.
2727                let handle = store
2728                    .0
2729                    .handle_table(caller_instance)
2730                    .subtask_insert_guest(guest_thread.task.rep())?;
2731                store
2732                    .0
2733                    .concurrent_state_mut()
2734                    .get_mut(guest_thread.task)?
2735                    .common
2736                    .handle = Some(handle);
2737                break (status, Some(handle));
2738            } else {
2739                // The callee hasn't returned yet, and the caller is calling via
2740                // a sync-lowered import, so we loop and keep waiting until the
2741                // callee returns.
2742            }
2743        };
2744
2745        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2746
2747        // Reset the current thread to point to the caller as it resumes control.
2748        store.0.set_thread(Some(caller));
2749        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2750        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2751
2752        if let Some(storage) = storage {
2753            // The caller used a sync-lowered import to call an async-lifted
2754            // export, in which case the result, if any, has been stashed in
2755            // `GuestTask::sync_result`.
2756            let state = store.0.concurrent_state_mut();
2757            let task = state.get_mut(guest_thread.task)?;
2758            if let Some(result) = task.sync_result.take() {
2759                if let Some(result) = result {
2760                    storage[0] = MaybeUninit::new(result);
2761                }
2762
2763                if task.exited && task.ready_to_delete() {
2764                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2765                }
2766            }
2767        }
2768
2769        Ok(status.pack(waitable))
2770    }
2771
2772    /// Poll the specified future once on behalf of a guest->host call using an
2773    /// async-lowered import.
2774    ///
2775    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2776    /// `Pending`, add it to the set of futures to be polled as part of this
2777    /// instance's event loop until it completes, and then return
2778    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2779    ///
2780    /// Whether the future returns `Ready` immediately or later, the `lower`
2781    /// function will be used to lower the result, if any, into the guest caller's
2782    /// stack and linear memory unless the task has been cancelled.
2783    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2784        self,
2785        mut store: StoreContextMut<'_, T>,
2786        future: impl Future<Output = Result<R>> + Send + 'static,
2787        caller_instance: RuntimeComponentInstanceIndex,
2788        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2789    ) -> Result<Option<u32>> {
2790        let token = StoreToken::new(store.as_context_mut());
2791        let state = store.0.concurrent_state_mut();
2792        let caller = state.guest_thread.unwrap();
2793
2794        // Create an abortable future which hooks calls to poll and manages call
2795        // context state for the future.
2796        let (join_handle, future) = JoinHandle::run(async move {
2797            let mut future = pin!(future);
2798            let mut call_context = None;
2799            future::poll_fn(move |cx| {
2800                // Push the call context for managing any resource borrows
2801                // for the task.
2802                tls::get(|store| {
2803                    if let Some(call_context) = call_context.take() {
2804                        token
2805                            .as_context_mut(store)
2806                            .0
2807                            .component_resource_state()
2808                            .0
2809                            .push(call_context);
2810                    }
2811                });
2812
2813                let result = future.as_mut().poll(cx);
2814
2815                if result.is_pending() {
2816                    // Pop the call context for managing any resource
2817                    // borrows for the task.
2818                    tls::get(|store| {
2819                        call_context = Some(
2820                            token
2821                                .as_context_mut(store)
2822                                .0
2823                                .component_resource_state()
2824                                .0
2825                                .pop()
2826                                .unwrap(),
2827                        );
2828                    });
2829                }
2830                result
2831            })
2832            .await
2833        });
2834
2835        // We create a new host task even though it might complete immediately
2836        // (in which case we won't need to pass a waitable back to the guest).
2837        // If it does complete immediately, we'll remove it before we return.
2838        let task = state.push(HostTask::new(
2839            RuntimeInstance {
2840                instance: self.id().instance(),
2841                index: caller_instance,
2842            },
2843            Some(join_handle),
2844        ))?;
2845
2846        log::trace!("new host task child of {caller:?}: {task:?}");
2847
2848        let mut future = Box::pin(future);
2849
2850        // Finally, poll the future.  We can use a dummy `Waker` here because
2851        // we'll add the future to `ConcurrentState::futures` and poll it
2852        // automatically from the event loop if it doesn't complete immediately
2853        // here.
2854        let poll = tls::set(store.0, || {
2855            future
2856                .as_mut()
2857                .poll(&mut Context::from_waker(&Waker::noop()))
2858        });
2859
2860        Ok(match poll {
2861            Poll::Ready(None) => unreachable!(),
2862            Poll::Ready(Some(result)) => {
2863                // It finished immediately; lower the result and delete the
2864                // task.
2865                lower(store.as_context_mut(), result?)?;
2866                log::trace!("delete host task {task:?} (already ready)");
2867                store.0.concurrent_state_mut().delete(task)?;
2868                None
2869            }
2870            Poll::Pending => {
2871                // It hasn't finished yet; add the future to
2872                // `ConcurrentState::futures` so it will be polled by the event
2873                // loop and allocate a waitable handle to return to the guest.
2874
2875                // Wrap the future in a closure responsible for lowering the result into
2876                // the guest's stack and memory, as well as notifying any waiters that
2877                // the task returned.
2878                let future =
2879                    Box::pin(async move {
2880                        let result = match future.await {
2881                            Some(result) => result?,
2882                            // Task was cancelled; nothing left to do.
2883                            None => return Ok(()),
2884                        };
2885                        tls::get(move |store| {
2886                            // Here we schedule a task to run on a worker fiber to do
2887                            // the lowering since it may involve a call to the guest's
2888                            // realloc function.  This is necessary because calling the
2889                            // guest while there are host embedder frames on the stack
2890                            // is unsound.
2891                            store.concurrent_state_mut().push_high_priority(
2892                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2893                                    lower(token.as_context_mut(store), result)?;
2894                                    let state = store.concurrent_state_mut();
2895                                    state.get_mut(task)?.join_handle.take();
2896                                    Waitable::Host(task).set_event(
2897                                        state,
2898                                        Some(Event::Subtask {
2899                                            status: Status::Returned,
2900                                        }),
2901                                    )
2902                                }))),
2903                            );
2904                            Ok(())
2905                        })
2906                    });
2907
2908                store.0.concurrent_state_mut().push_future(future);
2909                let handle = store
2910                    .0
2911                    .handle_table(RuntimeInstance {
2912                        instance: self.id().instance(),
2913                        index: caller_instance,
2914                    })
2915                    .subtask_insert_host(task.rep())?;
2916                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2917                log::trace!(
2918                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2919                );
2920                Some(handle)
2921            }
2922        })
2923    }
2924
2925    /// Implements the `task.return` intrinsic, lifting the result for the
2926    /// current guest task.
2927    pub(crate) fn task_return(
2928        self,
2929        store: &mut dyn VMStore,
2930        caller: RuntimeComponentInstanceIndex,
2931        ty: TypeTupleIndex,
2932        options: OptionsIndex,
2933        storage: &[ValRaw],
2934    ) -> Result<()> {
2935        self.check_may_leave(store, caller)?;
2936
2937        let state = store.concurrent_state_mut();
2938        let guest_thread = state.guest_thread.unwrap();
2939        let lift = state
2940            .get_mut(guest_thread.task)?
2941            .lift_result
2942            .take()
2943            .ok_or_else(|| {
2944                format_err!("`task.return` or `task.cancel` called more than once for current task")
2945            })?;
2946        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2947
2948        let CanonicalOptions {
2949            string_encoding,
2950            data_model,
2951            ..
2952        } = &self.id().get(store).component().env_component().options[options];
2953
2954        let invalid = ty != lift.ty
2955            || string_encoding != &lift.string_encoding
2956            || match data_model {
2957                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2958                    Some(memory) => {
2959                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2960                        let actual = self.id().get(store).runtime_memory(memory);
2961                        expected != actual.as_ptr()
2962                    }
2963                    // Memory not specified, meaning it didn't need to be
2964                    // specified per validation, so not invalid.
2965                    None => false,
2966                },
2967                // Always invalid as this isn't supported.
2968                CanonicalOptionsDataModel::Gc { .. } => true,
2969            };
2970
2971        if invalid {
2972            bail!("invalid `task.return` signature and/or options for current task");
2973        }
2974
2975        log::trace!("task.return for {guest_thread:?}");
2976
2977        let result = (lift.lift)(store, storage)?;
2978        self.task_complete(
2979            store,
2980            guest_thread.task,
2981            result,
2982            Status::Returned,
2983            ValRaw::i32(0),
2984        )
2985    }
2986
2987    /// Implements the `task.cancel` intrinsic.
2988    pub(crate) fn task_cancel(
2989        self,
2990        store: &mut StoreOpaque,
2991        caller: RuntimeComponentInstanceIndex,
2992    ) -> Result<()> {
2993        self.check_may_leave(store, caller)?;
2994
2995        let state = store.concurrent_state_mut();
2996        let guest_thread = state.guest_thread.unwrap();
2997        let task = state.get_mut(guest_thread.task)?;
2998        if !task.cancel_sent {
2999            bail!("`task.cancel` called by task which has not been cancelled")
3000        }
3001        _ = task.lift_result.take().ok_or_else(|| {
3002            format_err!("`task.return` or `task.cancel` called more than once for current task")
3003        })?;
3004
3005        assert!(task.result.is_none());
3006
3007        log::trace!("task.cancel for {guest_thread:?}");
3008
3009        self.task_complete(
3010            store,
3011            guest_thread.task,
3012            Box::new(DummyResult),
3013            Status::ReturnCancelled,
3014            ValRaw::i32(0),
3015        )
3016    }
3017
3018    /// Complete the specified guest task (i.e. indicate that it has either
3019    /// returned a (possibly empty) result or cancelled itself).
3020    ///
3021    /// This will return any resource borrows and notify any current or future
3022    /// waiters that the task has completed.
3023    fn task_complete(
3024        self,
3025        store: &mut StoreOpaque,
3026        guest_task: TableId<GuestTask>,
3027        result: Box<dyn Any + Send + Sync>,
3028        status: Status,
3029        post_return_arg: ValRaw,
3030    ) -> Result<()> {
3031        if store
3032            .concurrent_state_mut()
3033            .get_mut(guest_task)?
3034            .call_post_return_automatically()
3035        {
3036            let (calls, host_table, _, instance) =
3037                store.component_resource_state_with_instance(self);
3038            ResourceTables {
3039                calls,
3040                host_table: Some(host_table),
3041                guest: Some(instance.instance_states()),
3042            }
3043            .exit_call()?;
3044        } else {
3045            // As of this writing, the only scenario where `call_post_return_automatically`
3046            // would be false for a `GuestTask` is for host-to-guest calls using
3047            // `[Typed]Func::call_async`, in which case the `function_index`
3048            // should be a non-`None` value.
3049            let function_index = store
3050                .concurrent_state_mut()
3051                .get_mut(guest_task)?
3052                .function_index
3053                .unwrap();
3054            self.id()
3055                .get_mut(store)
3056                .post_return_arg_set(function_index, post_return_arg);
3057        }
3058
3059        let state = store.concurrent_state_mut();
3060        let task = state.get_mut(guest_task)?;
3061
3062        if let Caller::Host { tx, .. } = &mut task.caller {
3063            if let Some(tx) = tx.take() {
3064                _ = tx.send(result);
3065            }
3066        } else {
3067            task.result = Some(result);
3068            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3069        }
3070
3071        Ok(())
3072    }
3073
3074    /// Implements the `waitable-set.new` intrinsic.
3075    pub(crate) fn waitable_set_new(
3076        self,
3077        store: &mut StoreOpaque,
3078        caller_instance: RuntimeComponentInstanceIndex,
3079    ) -> Result<u32> {
3080        self.check_may_leave(store, caller_instance)?;
3081
3082        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3083        let handle = store
3084            .handle_table(RuntimeInstance {
3085                instance: self.id().instance(),
3086                index: caller_instance,
3087            })
3088            .waitable_set_insert(set.rep())?;
3089        log::trace!("new waitable set {set:?} (handle {handle})");
3090        Ok(handle)
3091    }
3092
3093    /// Implements the `waitable-set.drop` intrinsic.
3094    pub(crate) fn waitable_set_drop(
3095        self,
3096        store: &mut StoreOpaque,
3097        caller_instance: RuntimeComponentInstanceIndex,
3098        set: u32,
3099    ) -> Result<()> {
3100        self.check_may_leave(store, caller_instance)?;
3101
3102        let rep = store
3103            .handle_table(RuntimeInstance {
3104                instance: self.id().instance(),
3105                index: caller_instance,
3106            })
3107            .waitable_set_remove(set)?;
3108
3109        log::trace!("drop waitable set {rep} (handle {set})");
3110
3111        let set = store
3112            .concurrent_state_mut()
3113            .delete(TableId::<WaitableSet>::new(rep))?;
3114
3115        if !set.waiting.is_empty() {
3116            bail!("cannot drop waitable set with waiters");
3117        }
3118
3119        Ok(())
3120    }
3121
3122    /// Implements the `waitable.join` intrinsic.
3123    pub(crate) fn waitable_join(
3124        self,
3125        store: &mut StoreOpaque,
3126        caller_instance: RuntimeComponentInstanceIndex,
3127        waitable_handle: u32,
3128        set_handle: u32,
3129    ) -> Result<()> {
3130        self.check_may_leave(store, caller_instance)?;
3131
3132        let mut instance = self.id().get_mut(store);
3133        let waitable =
3134            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3135
3136        let set = if set_handle == 0 {
3137            None
3138        } else {
3139            let set = instance.instance_states().0[caller_instance]
3140                .handle_table()
3141                .waitable_set_rep(set_handle)?;
3142
3143            Some(TableId::<WaitableSet>::new(set))
3144        };
3145
3146        log::trace!(
3147            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3148        );
3149
3150        waitable.join(store.concurrent_state_mut(), set)
3151    }
3152
3153    /// Implements the `subtask.drop` intrinsic.
3154    pub(crate) fn subtask_drop(
3155        self,
3156        store: &mut StoreOpaque,
3157        caller_instance: RuntimeComponentInstanceIndex,
3158        task_id: u32,
3159    ) -> Result<()> {
3160        self.check_may_leave(store, caller_instance)?;
3161
3162        self.waitable_join(store, caller_instance, task_id, 0)?;
3163
3164        let (rep, is_host) = store
3165            .handle_table(RuntimeInstance {
3166                instance: self.id().instance(),
3167                index: caller_instance,
3168            })
3169            .subtask_remove(task_id)?;
3170
3171        let concurrent_state = store.concurrent_state_mut();
3172        let (waitable, expected_caller_instance, delete) = if is_host {
3173            let id = TableId::<HostTask>::new(rep);
3174            let task = concurrent_state.get_mut(id)?;
3175            if task.join_handle.is_some() {
3176                bail!("cannot drop a subtask which has not yet resolved");
3177            }
3178            (Waitable::Host(id), task.caller_instance, true)
3179        } else {
3180            let id = TableId::<GuestTask>::new(rep);
3181            let task = concurrent_state.get_mut(id)?;
3182            if task.lift_result.is_some() {
3183                bail!("cannot drop a subtask which has not yet resolved");
3184            }
3185            if let &Caller::Guest { thread } = &task.caller {
3186                (
3187                    Waitable::Guest(id),
3188                    concurrent_state.get_mut(thread.task)?.instance,
3189                    concurrent_state.get_mut(id)?.exited,
3190                )
3191            } else {
3192                unreachable!()
3193            }
3194        };
3195
3196        waitable.common(concurrent_state)?.handle = None;
3197
3198        if waitable.take_event(concurrent_state)?.is_some() {
3199            bail!("cannot drop a subtask with an undelivered event");
3200        }
3201
3202        if delete {
3203            waitable.delete_from(concurrent_state)?;
3204        }
3205
3206        // Since waitables can neither be passed between instances nor forged,
3207        // this should never fail unless there's a bug in Wasmtime, but we check
3208        // here to be sure:
3209        assert_eq!(
3210            expected_caller_instance,
3211            RuntimeInstance {
3212                instance: self.id().instance(),
3213                index: caller_instance
3214            }
3215        );
3216        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3217        Ok(())
3218    }
3219
3220    /// Implements the `waitable-set.wait` intrinsic.
3221    pub(crate) fn waitable_set_wait(
3222        self,
3223        store: &mut StoreOpaque,
3224        caller: RuntimeComponentInstanceIndex,
3225        options: OptionsIndex,
3226        set: u32,
3227        payload: u32,
3228    ) -> Result<u32> {
3229        self.check_may_leave(store, caller)?;
3230
3231        if !self.options(store, options).async_ {
3232            // The caller may only call `waitable-set.wait` from an async task
3233            // (i.e. a task created via a call to an async export).
3234            // Otherwise, we'll trap.
3235            store.check_blocking()?;
3236        }
3237
3238        let &CanonicalOptions {
3239            cancellable,
3240            instance: caller_instance,
3241            ..
3242        } = &self.id().get(store).component().env_component().options[options];
3243        let rep = store
3244            .handle_table(RuntimeInstance {
3245                instance: self.id().instance(),
3246                index: caller_instance,
3247            })
3248            .waitable_set_rep(set)?;
3249
3250        self.waitable_check(
3251            store,
3252            cancellable,
3253            WaitableCheck::Wait,
3254            WaitableCheckParams {
3255                set: TableId::new(rep),
3256                options,
3257                payload,
3258            },
3259        )
3260    }
3261
3262    /// Implements the `waitable-set.poll` intrinsic.
3263    pub(crate) fn waitable_set_poll(
3264        self,
3265        store: &mut StoreOpaque,
3266        caller: RuntimeComponentInstanceIndex,
3267        options: OptionsIndex,
3268        set: u32,
3269        payload: u32,
3270    ) -> Result<u32> {
3271        self.check_may_leave(store, caller)?;
3272
3273        let &CanonicalOptions {
3274            cancellable,
3275            instance: caller_instance,
3276            ..
3277        } = &self.id().get(store).component().env_component().options[options];
3278        let rep = store
3279            .handle_table(RuntimeInstance {
3280                instance: self.id().instance(),
3281                index: caller_instance,
3282            })
3283            .waitable_set_rep(set)?;
3284
3285        self.waitable_check(
3286            store,
3287            cancellable,
3288            WaitableCheck::Poll,
3289            WaitableCheckParams {
3290                set: TableId::new(rep),
3291                options,
3292                payload,
3293            },
3294        )
3295    }
3296
3297    /// Implements the `thread.index` intrinsic.
3298    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3299        let thread_id = store.concurrent_state_mut().guest_thread.unwrap().thread;
3300        // The unwrap is safe because `instance_rep` must be `Some` by this point
3301        Ok(store
3302            .concurrent_state_mut()
3303            .get_mut(thread_id)?
3304            .instance_rep
3305            .unwrap())
3306    }
3307
3308    /// Implements the `thread.new-indirect` intrinsic.
3309    pub(crate) fn thread_new_indirect<T: 'static>(
3310        self,
3311        mut store: StoreContextMut<T>,
3312        runtime_instance: RuntimeComponentInstanceIndex,
3313        _func_ty_idx: TypeFuncIndex, // currently unused
3314        start_func_table_idx: RuntimeTableIndex,
3315        start_func_idx: u32,
3316        context: i32,
3317    ) -> Result<u32> {
3318        self.check_may_leave(store.0, runtime_instance)?;
3319
3320        log::trace!("creating new thread");
3321
3322        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3323        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3324        let callee = instance
3325            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3326            .ok_or_else(|| {
3327                format_err!("the start function index points to an uninitialized function")
3328            })?;
3329        if callee.type_index(store.0) != start_func_ty.type_index() {
3330            bail!(
3331                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3332            );
3333        }
3334
3335        let token = StoreToken::new(store.as_context_mut());
3336        let start_func = Box::new(
3337            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3338                let old_thread = store.set_thread(Some(guest_thread));
3339                log::trace!(
3340                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3341                );
3342
3343                store.maybe_push_call_context(guest_thread.task)?;
3344
3345                let mut store = token.as_context_mut(store);
3346                let mut params = [ValRaw::i32(context)];
3347                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3348                // on a separate fiber if we're running in an async store.
3349                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3350
3351                store.0.maybe_pop_call_context(guest_thread.task)?;
3352
3353                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3354                log::trace!("explicit thread {guest_thread:?} completed");
3355                let state = store.0.concurrent_state_mut();
3356                let task = state.get_mut(guest_thread.task)?;
3357                if task.threads.is_empty() && !task.returned_or_cancelled() {
3358                    bail!(Trap::NoAsyncResult);
3359                }
3360                store.0.set_thread(old_thread);
3361                let state = store.0.concurrent_state_mut();
3362                old_thread
3363                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3364                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3365                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3366                }
3367                log::trace!("thread start: restored {old_thread:?} as current thread");
3368
3369                Ok(())
3370            },
3371        );
3372
3373        let state = store.0.concurrent_state_mut();
3374        let current_thread = state.guest_thread.unwrap();
3375        let parent_task = current_thread.task;
3376
3377        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3378        let thread_id = state.push(new_thread)?;
3379        state.get_mut(parent_task)?.threads.insert(thread_id);
3380
3381        log::trace!("new thread with id {thread_id:?} created");
3382
3383        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3384    }
3385
3386    pub(crate) fn resume_suspended_thread(
3387        self,
3388        store: &mut StoreOpaque,
3389        runtime_instance: RuntimeComponentInstanceIndex,
3390        thread_idx: u32,
3391        high_priority: bool,
3392    ) -> Result<()> {
3393        let thread_id =
3394            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3395        let state = store.concurrent_state_mut();
3396        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3397        let thread = state.get_mut(guest_thread.thread)?;
3398
3399        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3400            GuestThreadState::NotStartedExplicit(start_func) => {
3401                log::trace!("starting thread {guest_thread:?}");
3402                let guest_call = WorkItem::GuestCall(GuestCall {
3403                    thread: guest_thread,
3404                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3405                        start_func(store, guest_thread)
3406                    })),
3407                });
3408                store
3409                    .concurrent_state_mut()
3410                    .push_work_item(guest_call, high_priority);
3411            }
3412            GuestThreadState::Suspended(fiber) => {
3413                log::trace!("resuming thread {thread_id:?} that was suspended");
3414                store
3415                    .concurrent_state_mut()
3416                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3417            }
3418            _ => {
3419                bail!("cannot resume thread which is not suspended");
3420            }
3421        }
3422        Ok(())
3423    }
3424
3425    fn add_guest_thread_to_instance_table(
3426        self,
3427        thread_id: TableId<GuestThread>,
3428        store: &mut StoreOpaque,
3429        runtime_instance: RuntimeComponentInstanceIndex,
3430    ) -> Result<u32> {
3431        let guest_id = store
3432            .handle_table(RuntimeInstance {
3433                instance: self.id().instance(),
3434                index: runtime_instance,
3435            })
3436            .guest_thread_insert(thread_id.rep())?;
3437        store
3438            .concurrent_state_mut()
3439            .get_mut(thread_id)?
3440            .instance_rep = Some(guest_id);
3441        Ok(guest_id)
3442    }
3443
3444    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3445    /// and `thread.switch-to` intrinsics.
3446    pub(crate) fn suspension_intrinsic(
3447        self,
3448        store: &mut StoreOpaque,
3449        caller: RuntimeComponentInstanceIndex,
3450        cancellable: bool,
3451        yielding: bool,
3452        to_thread: Option<u32>,
3453    ) -> Result<WaitResult> {
3454        self.check_may_leave(store, caller)?;
3455
3456        if to_thread.is_none() {
3457            let state = store.concurrent_state_mut();
3458            if yielding {
3459                // This is a `thread.yield` call
3460                if !state.may_block(state.guest_thread.unwrap().task) {
3461                    // The spec defines `thread.yield` to be a no-op in a
3462                    // non-blocking context, so we return immediately without giving
3463                    // any other thread a chance to run.
3464                    return Ok(WaitResult::Completed);
3465                }
3466            } else {
3467                // The caller may only call `thread.suspend` from an async task
3468                // (i.e. a task created via a call to an async export).
3469                // Otherwise, we'll trap.
3470                store.check_blocking()?;
3471            }
3472        }
3473
3474        // There could be a pending cancellation from a previous uncancellable wait
3475        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3476            return Ok(WaitResult::Cancelled);
3477        }
3478
3479        if let Some(thread) = to_thread {
3480            self.resume_suspended_thread(store, caller, thread, true)?;
3481        }
3482
3483        let state = store.concurrent_state_mut();
3484        let guest_thread = state.guest_thread.unwrap();
3485        let reason = if yielding {
3486            SuspendReason::Yielding {
3487                thread: guest_thread,
3488                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3489                // we're handling a `thread.yield-to` call; otherwise it would
3490                // panic if we called it in a non-blocking context.
3491                skip_may_block_check: to_thread.is_some(),
3492            }
3493        } else {
3494            SuspendReason::ExplicitlySuspending {
3495                thread: guest_thread,
3496                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3497                // we're handling a `thread.switch-to` call; otherwise it would
3498                // panic if we called it in a non-blocking context.
3499                skip_may_block_check: to_thread.is_some(),
3500            }
3501        };
3502
3503        store.suspend(reason)?;
3504
3505        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3506            Ok(WaitResult::Cancelled)
3507        } else {
3508            Ok(WaitResult::Completed)
3509        }
3510    }
3511
3512    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3513    fn waitable_check(
3514        self,
3515        store: &mut StoreOpaque,
3516        cancellable: bool,
3517        check: WaitableCheck,
3518        params: WaitableCheckParams,
3519    ) -> Result<u32> {
3520        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3521
3522        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3523
3524        let state = store.concurrent_state_mut();
3525        let task = state.get_mut(guest_thread.task)?;
3526
3527        // If we're waiting, and there are no events immediately available,
3528        // suspend the fiber until that changes.
3529        match &check {
3530            WaitableCheck::Wait => {
3531                let set = params.set;
3532
3533                if (task.event.is_none()
3534                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3535                    && state.get_mut(set)?.ready.is_empty()
3536                {
3537                    if cancellable {
3538                        let old = state
3539                            .get_mut(guest_thread.thread)?
3540                            .wake_on_cancel
3541                            .replace(set);
3542                        assert!(old.is_none());
3543                    }
3544
3545                    store.suspend(SuspendReason::Waiting {
3546                        set,
3547                        thread: guest_thread,
3548                        skip_may_block_check: false,
3549                    })?;
3550                }
3551            }
3552            WaitableCheck::Poll => {}
3553        }
3554
3555        log::trace!(
3556            "waitable check for {guest_thread:?}; set {:?}, part two",
3557            params.set
3558        );
3559
3560        // Deliver any pending events to the guest and return.
3561        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3562
3563        let (ordinal, handle, result) = match &check {
3564            WaitableCheck::Wait => {
3565                let (event, waitable) = event.unwrap();
3566                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3567                let (ordinal, result) = event.parts();
3568                (ordinal, handle, result)
3569            }
3570            WaitableCheck::Poll => {
3571                if let Some((event, waitable)) = event {
3572                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3573                    let (ordinal, result) = event.parts();
3574                    (ordinal, handle, result)
3575                } else {
3576                    log::trace!(
3577                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3578                        guest_thread.task,
3579                        params.set
3580                    );
3581                    let (ordinal, result) = Event::None.parts();
3582                    (ordinal, 0, result)
3583                }
3584            }
3585        };
3586        let memory = self.options_memory_mut(store, params.options);
3587        let ptr = func::validate_inbounds_dynamic(
3588            &CanonicalAbiInfo::POINTER_PAIR,
3589            memory,
3590            &ValRaw::u32(params.payload),
3591        )?;
3592        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3593        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3594        Ok(ordinal)
3595    }
3596
3597    /// Implements the `subtask.cancel` intrinsic.
3598    pub(crate) fn subtask_cancel(
3599        self,
3600        store: &mut StoreOpaque,
3601        caller_instance: RuntimeComponentInstanceIndex,
3602        async_: bool,
3603        task_id: u32,
3604    ) -> Result<u32> {
3605        self.check_may_leave(store, caller_instance)?;
3606
3607        if !async_ {
3608            // The caller may only sync call `subtask.cancel` from an async task
3609            // (i.e. a task created via a call to an async export).  Otherwise,
3610            // we'll trap.
3611            store.check_blocking()?;
3612        }
3613
3614        let (rep, is_host) = store
3615            .handle_table(RuntimeInstance {
3616                instance: self.id().instance(),
3617                index: caller_instance,
3618            })
3619            .subtask_rep(task_id)?;
3620        let (waitable, expected_caller_instance) = if is_host {
3621            let id = TableId::<HostTask>::new(rep);
3622            (
3623                Waitable::Host(id),
3624                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3625            )
3626        } else {
3627            let id = TableId::<GuestTask>::new(rep);
3628            if let &Caller::Guest { thread } = &store.concurrent_state_mut().get_mut(id)?.caller {
3629                (
3630                    Waitable::Guest(id),
3631                    store.concurrent_state_mut().get_mut(thread.task)?.instance,
3632                )
3633            } else {
3634                unreachable!()
3635            }
3636        };
3637        // Since waitables can neither be passed between instances nor forged,
3638        // this should never fail unless there's a bug in Wasmtime, but we check
3639        // here to be sure:
3640        assert_eq!(
3641            expected_caller_instance,
3642            RuntimeInstance {
3643                instance: self.id().instance(),
3644                index: caller_instance
3645            }
3646        );
3647
3648        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3649
3650        let concurrent_state = store.concurrent_state_mut();
3651        if let Waitable::Host(host_task) = waitable {
3652            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3653                handle.abort();
3654                return Ok(Status::ReturnCancelled as u32);
3655            }
3656        } else {
3657            let caller = concurrent_state.guest_thread.unwrap();
3658            let guest_task = TableId::<GuestTask>::new(rep);
3659            let task = concurrent_state.get_mut(guest_task)?;
3660            if !task.already_lowered_parameters() {
3661                // The task is in a `starting` state, meaning it hasn't run at
3662                // all yet.  Here we update its fields to indicate that it is
3663                // ready to delete immediately once `subtask.drop` is called.
3664                task.lower_params = None;
3665                task.lift_result = None;
3666                task.exited = true;
3667
3668                let instance = task.instance;
3669
3670                assert_eq!(1, task.threads.len());
3671                let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3672                let concurrent_state = store.concurrent_state_mut();
3673                concurrent_state.delete(thread)?;
3674                assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3675
3676                // Not yet started; cancel and remove from pending
3677                let pending = &mut store.instance_state(instance).pending;
3678                let pending_count = pending.len();
3679                pending.retain(|thread, _| thread.task != guest_task);
3680                // If there were no pending threads for this task, we're in an error state
3681                if pending.len() == pending_count {
3682                    bail!("`subtask.cancel` called after terminal status delivered");
3683                }
3684                return Ok(Status::StartCancelled as u32);
3685            } else if !task.returned_or_cancelled() {
3686                // Started, but not yet returned or cancelled; send the
3687                // `CANCELLED` event
3688                task.cancel_sent = true;
3689                // Note that this might overwrite an event that was set earlier
3690                // (e.g. `Event::None` if the task is yielding, or
3691                // `Event::Cancelled` if it was already cancelled), but that's
3692                // okay -- this should supersede the previous state.
3693                task.event = Some(Event::Cancelled);
3694                for thread in task.threads.clone() {
3695                    let thread = QualifiedThreadId {
3696                        task: guest_task,
3697                        thread,
3698                    };
3699                    if let Some(set) = concurrent_state
3700                        .get_mut(thread.thread)
3701                        .unwrap()
3702                        .wake_on_cancel
3703                        .take()
3704                    {
3705                        let item = match concurrent_state
3706                            .get_mut(set)?
3707                            .waiting
3708                            .remove(&thread)
3709                            .unwrap()
3710                        {
3711                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3712                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3713                                thread,
3714                                kind: GuestCallKind::DeliverEvent {
3715                                    instance,
3716                                    set: None,
3717                                },
3718                            }),
3719                        };
3720                        concurrent_state.push_high_priority(item);
3721
3722                        store.suspend(SuspendReason::Yielding {
3723                            thread: caller,
3724                            // `subtask.cancel` is not allowed to be called in a
3725                            // sync context, so we cannot skip the may-block check.
3726                            skip_may_block_check: false,
3727                        })?;
3728                        break;
3729                    }
3730                }
3731
3732                let concurrent_state = store.concurrent_state_mut();
3733                let task = concurrent_state.get_mut(guest_task)?;
3734                if !task.returned_or_cancelled() {
3735                    if async_ {
3736                        return Ok(BLOCKED);
3737                    } else {
3738                        store.wait_for_event(Waitable::Guest(guest_task))?;
3739                    }
3740                }
3741            }
3742        }
3743
3744        let event = waitable.take_event(store.concurrent_state_mut())?;
3745        if let Some(Event::Subtask {
3746            status: status @ (Status::Returned | Status::ReturnCancelled),
3747        }) = event
3748        {
3749            Ok(status as u32)
3750        } else {
3751            bail!("`subtask.cancel` called after terminal status delivered");
3752        }
3753    }
3754
3755    pub(crate) fn context_get(
3756        self,
3757        store: &mut StoreOpaque,
3758        caller: RuntimeComponentInstanceIndex,
3759        slot: u32,
3760    ) -> Result<u32> {
3761        self.check_may_leave(store, caller)?;
3762
3763        store.concurrent_state_mut().context_get(slot)
3764    }
3765
3766    pub(crate) fn context_set(
3767        self,
3768        store: &mut StoreOpaque,
3769        caller: RuntimeComponentInstanceIndex,
3770        slot: u32,
3771        value: u32,
3772    ) -> Result<()> {
3773        self.check_may_leave(store, caller)?;
3774
3775        store.concurrent_state_mut().context_set(slot, value)
3776    }
3777}
3778
3779/// Trait representing component model ABI async intrinsics and fused adapter
3780/// helper functions.
3781///
3782/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3783/// which must be valid for at least the duration of the call (and possibly for
3784/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3785/// pointers used for async calls).
3786pub trait VMComponentAsyncStore {
3787    /// A helper function for fused adapter modules involving calls where the
3788    /// one of the caller or callee is async.
3789    ///
3790    /// This helper is not used when the caller and callee both use the sync
3791    /// ABI, only when at least one is async is this used.
3792    unsafe fn prepare_call(
3793        &mut self,
3794        instance: Instance,
3795        memory: *mut VMMemoryDefinition,
3796        start: *mut VMFuncRef,
3797        return_: *mut VMFuncRef,
3798        caller_instance: RuntimeComponentInstanceIndex,
3799        callee_instance: RuntimeComponentInstanceIndex,
3800        task_return_type: TypeTupleIndex,
3801        callee_async: bool,
3802        string_encoding: u8,
3803        result_count: u32,
3804        storage: *mut ValRaw,
3805        storage_len: usize,
3806    ) -> Result<()>;
3807
3808    /// A helper function for fused adapter modules involving calls where the
3809    /// caller is sync-lowered but the callee is async-lifted.
3810    unsafe fn sync_start(
3811        &mut self,
3812        instance: Instance,
3813        callback: *mut VMFuncRef,
3814        callee: *mut VMFuncRef,
3815        param_count: u32,
3816        storage: *mut MaybeUninit<ValRaw>,
3817        storage_len: usize,
3818    ) -> Result<()>;
3819
3820    /// A helper function for fused adapter modules involving calls where the
3821    /// caller is async-lowered.
3822    unsafe fn async_start(
3823        &mut self,
3824        instance: Instance,
3825        callback: *mut VMFuncRef,
3826        post_return: *mut VMFuncRef,
3827        callee: *mut VMFuncRef,
3828        param_count: u32,
3829        result_count: u32,
3830        flags: u32,
3831    ) -> Result<u32>;
3832
3833    /// The `future.write` intrinsic.
3834    fn future_write(
3835        &mut self,
3836        instance: Instance,
3837        caller: RuntimeComponentInstanceIndex,
3838        ty: TypeFutureTableIndex,
3839        options: OptionsIndex,
3840        future: u32,
3841        address: u32,
3842    ) -> Result<u32>;
3843
3844    /// The `future.read` intrinsic.
3845    fn future_read(
3846        &mut self,
3847        instance: Instance,
3848        caller: RuntimeComponentInstanceIndex,
3849        ty: TypeFutureTableIndex,
3850        options: OptionsIndex,
3851        future: u32,
3852        address: u32,
3853    ) -> Result<u32>;
3854
3855    /// The `future.drop-writable` intrinsic.
3856    fn future_drop_writable(
3857        &mut self,
3858        instance: Instance,
3859        caller: RuntimeComponentInstanceIndex,
3860        ty: TypeFutureTableIndex,
3861        writer: u32,
3862    ) -> Result<()>;
3863
3864    /// The `stream.write` intrinsic.
3865    fn stream_write(
3866        &mut self,
3867        instance: Instance,
3868        caller: RuntimeComponentInstanceIndex,
3869        ty: TypeStreamTableIndex,
3870        options: OptionsIndex,
3871        stream: u32,
3872        address: u32,
3873        count: u32,
3874    ) -> Result<u32>;
3875
3876    /// The `stream.read` intrinsic.
3877    fn stream_read(
3878        &mut self,
3879        instance: Instance,
3880        caller: RuntimeComponentInstanceIndex,
3881        ty: TypeStreamTableIndex,
3882        options: OptionsIndex,
3883        stream: u32,
3884        address: u32,
3885        count: u32,
3886    ) -> Result<u32>;
3887
3888    /// The "fast-path" implementation of the `stream.write` intrinsic for
3889    /// "flat" (i.e. memcpy-able) payloads.
3890    fn flat_stream_write(
3891        &mut self,
3892        instance: Instance,
3893        caller: RuntimeComponentInstanceIndex,
3894        ty: TypeStreamTableIndex,
3895        options: OptionsIndex,
3896        payload_size: u32,
3897        payload_align: u32,
3898        stream: u32,
3899        address: u32,
3900        count: u32,
3901    ) -> Result<u32>;
3902
3903    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3904    /// (i.e. memcpy-able) payloads.
3905    fn flat_stream_read(
3906        &mut self,
3907        instance: Instance,
3908        caller: RuntimeComponentInstanceIndex,
3909        ty: TypeStreamTableIndex,
3910        options: OptionsIndex,
3911        payload_size: u32,
3912        payload_align: u32,
3913        stream: u32,
3914        address: u32,
3915        count: u32,
3916    ) -> Result<u32>;
3917
3918    /// The `stream.drop-writable` intrinsic.
3919    fn stream_drop_writable(
3920        &mut self,
3921        instance: Instance,
3922        caller: RuntimeComponentInstanceIndex,
3923        ty: TypeStreamTableIndex,
3924        writer: u32,
3925    ) -> Result<()>;
3926
3927    /// The `error-context.debug-message` intrinsic.
3928    fn error_context_debug_message(
3929        &mut self,
3930        instance: Instance,
3931        caller: RuntimeComponentInstanceIndex,
3932        ty: TypeComponentLocalErrorContextTableIndex,
3933        options: OptionsIndex,
3934        err_ctx_handle: u32,
3935        debug_msg_address: u32,
3936    ) -> Result<()>;
3937
3938    /// The `thread.new-indirect` intrinsic
3939    fn thread_new_indirect(
3940        &mut self,
3941        instance: Instance,
3942        caller: RuntimeComponentInstanceIndex,
3943        func_ty_idx: TypeFuncIndex,
3944        start_func_table_idx: RuntimeTableIndex,
3945        start_func_idx: u32,
3946        context: i32,
3947    ) -> Result<u32>;
3948}
3949
3950/// SAFETY: See trait docs.
3951impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3952    unsafe fn prepare_call(
3953        &mut self,
3954        instance: Instance,
3955        memory: *mut VMMemoryDefinition,
3956        start: *mut VMFuncRef,
3957        return_: *mut VMFuncRef,
3958        caller_instance: RuntimeComponentInstanceIndex,
3959        callee_instance: RuntimeComponentInstanceIndex,
3960        task_return_type: TypeTupleIndex,
3961        callee_async: bool,
3962        string_encoding: u8,
3963        result_count_or_max_if_async: u32,
3964        storage: *mut ValRaw,
3965        storage_len: usize,
3966    ) -> Result<()> {
3967        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3968        // this method will have ensured that `storage` is a valid
3969        // pointer containing at least `storage_len` items.
3970        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3971
3972        unsafe {
3973            instance.prepare_call(
3974                StoreContextMut(self),
3975                start,
3976                return_,
3977                caller_instance,
3978                callee_instance,
3979                task_return_type,
3980                callee_async,
3981                memory,
3982                string_encoding,
3983                match result_count_or_max_if_async {
3984                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3985                        params,
3986                        has_result: false,
3987                    },
3988                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3989                        params,
3990                        has_result: true,
3991                    },
3992                    result_count => CallerInfo::Sync {
3993                        params,
3994                        result_count,
3995                    },
3996                },
3997            )
3998        }
3999    }
4000
4001    unsafe fn sync_start(
4002        &mut self,
4003        instance: Instance,
4004        callback: *mut VMFuncRef,
4005        callee: *mut VMFuncRef,
4006        param_count: u32,
4007        storage: *mut MaybeUninit<ValRaw>,
4008        storage_len: usize,
4009    ) -> Result<()> {
4010        unsafe {
4011            instance
4012                .start_call(
4013                    StoreContextMut(self),
4014                    callback,
4015                    ptr::null_mut(),
4016                    callee,
4017                    param_count,
4018                    1,
4019                    START_FLAG_ASYNC_CALLEE,
4020                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4021                    // this method will have ensured that `storage` is a valid
4022                    // pointer containing at least `storage_len` items.
4023                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4024                )
4025                .map(drop)
4026        }
4027    }
4028
4029    unsafe fn async_start(
4030        &mut self,
4031        instance: Instance,
4032        callback: *mut VMFuncRef,
4033        post_return: *mut VMFuncRef,
4034        callee: *mut VMFuncRef,
4035        param_count: u32,
4036        result_count: u32,
4037        flags: u32,
4038    ) -> Result<u32> {
4039        unsafe {
4040            instance.start_call(
4041                StoreContextMut(self),
4042                callback,
4043                post_return,
4044                callee,
4045                param_count,
4046                result_count,
4047                flags,
4048                None,
4049            )
4050        }
4051    }
4052
4053    fn future_write(
4054        &mut self,
4055        instance: Instance,
4056        caller: RuntimeComponentInstanceIndex,
4057        ty: TypeFutureTableIndex,
4058        options: OptionsIndex,
4059        future: u32,
4060        address: u32,
4061    ) -> Result<u32> {
4062        instance.check_may_leave(self, caller)?;
4063
4064        instance
4065            .guest_write(
4066                StoreContextMut(self),
4067                caller,
4068                TransmitIndex::Future(ty),
4069                options,
4070                None,
4071                future,
4072                address,
4073                1,
4074            )
4075            .map(|result| result.encode())
4076    }
4077
4078    fn future_read(
4079        &mut self,
4080        instance: Instance,
4081        caller: RuntimeComponentInstanceIndex,
4082        ty: TypeFutureTableIndex,
4083        options: OptionsIndex,
4084        future: u32,
4085        address: u32,
4086    ) -> Result<u32> {
4087        instance.check_may_leave(self, caller)?;
4088
4089        instance
4090            .guest_read(
4091                StoreContextMut(self),
4092                caller,
4093                TransmitIndex::Future(ty),
4094                options,
4095                None,
4096                future,
4097                address,
4098                1,
4099            )
4100            .map(|result| result.encode())
4101    }
4102
4103    fn stream_write(
4104        &mut self,
4105        instance: Instance,
4106        caller: RuntimeComponentInstanceIndex,
4107        ty: TypeStreamTableIndex,
4108        options: OptionsIndex,
4109        stream: u32,
4110        address: u32,
4111        count: u32,
4112    ) -> Result<u32> {
4113        instance.check_may_leave(self, caller)?;
4114
4115        instance
4116            .guest_write(
4117                StoreContextMut(self),
4118                caller,
4119                TransmitIndex::Stream(ty),
4120                options,
4121                None,
4122                stream,
4123                address,
4124                count,
4125            )
4126            .map(|result| result.encode())
4127    }
4128
4129    fn stream_read(
4130        &mut self,
4131        instance: Instance,
4132        caller: RuntimeComponentInstanceIndex,
4133        ty: TypeStreamTableIndex,
4134        options: OptionsIndex,
4135        stream: u32,
4136        address: u32,
4137        count: u32,
4138    ) -> Result<u32> {
4139        instance.check_may_leave(self, caller)?;
4140
4141        instance
4142            .guest_read(
4143                StoreContextMut(self),
4144                caller,
4145                TransmitIndex::Stream(ty),
4146                options,
4147                None,
4148                stream,
4149                address,
4150                count,
4151            )
4152            .map(|result| result.encode())
4153    }
4154
4155    fn future_drop_writable(
4156        &mut self,
4157        instance: Instance,
4158        caller: RuntimeComponentInstanceIndex,
4159        ty: TypeFutureTableIndex,
4160        writer: u32,
4161    ) -> Result<()> {
4162        instance.check_may_leave(self, caller)?;
4163
4164        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4165    }
4166
4167    fn flat_stream_write(
4168        &mut self,
4169        instance: Instance,
4170        caller: RuntimeComponentInstanceIndex,
4171        ty: TypeStreamTableIndex,
4172        options: OptionsIndex,
4173        payload_size: u32,
4174        payload_align: u32,
4175        stream: u32,
4176        address: u32,
4177        count: u32,
4178    ) -> Result<u32> {
4179        instance.check_may_leave(self, caller)?;
4180
4181        instance
4182            .guest_write(
4183                StoreContextMut(self),
4184                caller,
4185                TransmitIndex::Stream(ty),
4186                options,
4187                Some(FlatAbi {
4188                    size: payload_size,
4189                    align: payload_align,
4190                }),
4191                stream,
4192                address,
4193                count,
4194            )
4195            .map(|result| result.encode())
4196    }
4197
4198    fn flat_stream_read(
4199        &mut self,
4200        instance: Instance,
4201        caller: RuntimeComponentInstanceIndex,
4202        ty: TypeStreamTableIndex,
4203        options: OptionsIndex,
4204        payload_size: u32,
4205        payload_align: u32,
4206        stream: u32,
4207        address: u32,
4208        count: u32,
4209    ) -> Result<u32> {
4210        instance.check_may_leave(self, caller)?;
4211
4212        instance
4213            .guest_read(
4214                StoreContextMut(self),
4215                caller,
4216                TransmitIndex::Stream(ty),
4217                options,
4218                Some(FlatAbi {
4219                    size: payload_size,
4220                    align: payload_align,
4221                }),
4222                stream,
4223                address,
4224                count,
4225            )
4226            .map(|result| result.encode())
4227    }
4228
4229    fn stream_drop_writable(
4230        &mut self,
4231        instance: Instance,
4232        caller: RuntimeComponentInstanceIndex,
4233        ty: TypeStreamTableIndex,
4234        writer: u32,
4235    ) -> Result<()> {
4236        instance.check_may_leave(self, caller)?;
4237
4238        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4239    }
4240
4241    fn error_context_debug_message(
4242        &mut self,
4243        instance: Instance,
4244        caller: RuntimeComponentInstanceIndex,
4245        ty: TypeComponentLocalErrorContextTableIndex,
4246        options: OptionsIndex,
4247        err_ctx_handle: u32,
4248        debug_msg_address: u32,
4249    ) -> Result<()> {
4250        instance.check_may_leave(self, caller)?;
4251
4252        instance.error_context_debug_message(
4253            StoreContextMut(self),
4254            ty,
4255            options,
4256            err_ctx_handle,
4257            debug_msg_address,
4258        )
4259    }
4260
4261    fn thread_new_indirect(
4262        &mut self,
4263        instance: Instance,
4264        caller: RuntimeComponentInstanceIndex,
4265        func_ty_idx: TypeFuncIndex,
4266        start_func_table_idx: RuntimeTableIndex,
4267        start_func_idx: u32,
4268        context: i32,
4269    ) -> Result<u32> {
4270        instance.thread_new_indirect(
4271            StoreContextMut(self),
4272            caller,
4273            func_ty_idx,
4274            start_func_table_idx,
4275            start_func_idx,
4276            context,
4277        )
4278    }
4279}
4280
4281type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4282
4283/// Represents the state of a pending host task.
4284struct HostTask {
4285    common: WaitableCommon,
4286    caller_instance: RuntimeInstance,
4287    join_handle: Option<JoinHandle>,
4288}
4289
4290impl HostTask {
4291    fn new(caller_instance: RuntimeInstance, join_handle: Option<JoinHandle>) -> Self {
4292        Self {
4293            common: WaitableCommon::default(),
4294            caller_instance,
4295            join_handle,
4296        }
4297    }
4298}
4299
4300impl TableDebug for HostTask {
4301    fn type_name() -> &'static str {
4302        "HostTask"
4303    }
4304}
4305
4306type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4307
4308/// Represents the caller of a given guest task.
4309enum Caller {
4310    /// The host called the guest task.
4311    Host {
4312        /// If present, may be used to deliver the result.
4313        tx: Option<oneshot::Sender<LiftedResult>>,
4314        /// Channel to notify once all subtasks spawned by this caller have
4315        /// completed.
4316        ///
4317        /// Note that we'll never actually send anything to this channel;
4318        /// dropping it when the refcount goes to zero is sufficient to notify
4319        /// the receiver.
4320        exit_tx: Arc<oneshot::Sender<()>>,
4321        /// If true, there's a host future that must be dropped before the task
4322        /// can be deleted.
4323        host_future_present: bool,
4324        /// If true, call `post-return` function (if any) automatically.
4325        call_post_return_automatically: bool,
4326        /// If `Some`, represents the `QualifiedThreadId` caller of the host
4327        /// function which called back into a guest.  Note that this thread
4328        /// could belong to an entirely unrelated top-level component instance
4329        /// than the one the host called into.
4330        caller: Option<QualifiedThreadId>,
4331    },
4332    /// Another guest thread called the guest task
4333    Guest {
4334        /// The id of the caller
4335        thread: QualifiedThreadId,
4336    },
4337}
4338
4339/// Represents a closure and related canonical ABI parameters required to
4340/// validate a `task.return` call at runtime and lift the result.
4341struct LiftResult {
4342    lift: RawLift,
4343    ty: TypeTupleIndex,
4344    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4345    string_encoding: StringEncoding,
4346}
4347
4348/// The table ID for a guest thread, qualified by the task to which it belongs.
4349///
4350/// This exists to minimize table lookups and the necessity to pass stores around mutably
4351/// for the common case of identifying the task to which a thread belongs.
4352#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4353struct QualifiedThreadId {
4354    task: TableId<GuestTask>,
4355    thread: TableId<GuestThread>,
4356}
4357
4358impl QualifiedThreadId {
4359    fn qualify(
4360        state: &mut ConcurrentState,
4361        thread: TableId<GuestThread>,
4362    ) -> Result<QualifiedThreadId> {
4363        Ok(QualifiedThreadId {
4364            task: state.get_mut(thread)?.parent_task,
4365            thread,
4366        })
4367    }
4368}
4369
4370impl fmt::Debug for QualifiedThreadId {
4371    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4372        f.debug_tuple("QualifiedThreadId")
4373            .field(&self.task.rep())
4374            .field(&self.thread.rep())
4375            .finish()
4376    }
4377}
4378
4379enum GuestThreadState {
4380    NotStartedImplicit,
4381    NotStartedExplicit(
4382        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4383    ),
4384    Running,
4385    Suspended(StoreFiber<'static>),
4386    Pending,
4387    Completed,
4388}
4389pub struct GuestThread {
4390    /// Context-local state used to implement the `context.{get,set}`
4391    /// intrinsics.
4392    context: [u32; 2],
4393    /// The owning guest task.
4394    parent_task: TableId<GuestTask>,
4395    /// If present, indicates that the thread is currently waiting on the
4396    /// specified set but may be cancelled and woken immediately.
4397    wake_on_cancel: Option<TableId<WaitableSet>>,
4398    /// The execution state of this guest thread
4399    state: GuestThreadState,
4400    /// The index of this thread in the component instance's handle table.
4401    /// This must always be `Some` after initialization.
4402    instance_rep: Option<u32>,
4403}
4404
4405impl GuestThread {
4406    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4407    /// handle.
4408    fn from_instance(
4409        state: Pin<&mut ComponentInstance>,
4410        caller_instance: RuntimeComponentInstanceIndex,
4411        guest_thread: u32,
4412    ) -> Result<TableId<Self>> {
4413        let rep = state.instance_states().0[caller_instance]
4414            .handle_table()
4415            .guest_thread_rep(guest_thread)?;
4416        Ok(TableId::new(rep))
4417    }
4418
4419    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4420        Self {
4421            context: [0; 2],
4422            parent_task,
4423            wake_on_cancel: None,
4424            state: GuestThreadState::NotStartedImplicit,
4425            instance_rep: None,
4426        }
4427    }
4428
4429    fn new_explicit(
4430        parent_task: TableId<GuestTask>,
4431        start_func: Box<
4432            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4433        >,
4434    ) -> Self {
4435        Self {
4436            context: [0; 2],
4437            parent_task,
4438            wake_on_cancel: None,
4439            state: GuestThreadState::NotStartedExplicit(start_func),
4440            instance_rep: None,
4441        }
4442    }
4443}
4444
4445impl TableDebug for GuestThread {
4446    fn type_name() -> &'static str {
4447        "GuestThread"
4448    }
4449}
4450
4451enum SyncResult {
4452    NotProduced,
4453    Produced(Option<ValRaw>),
4454    Taken,
4455}
4456
4457impl SyncResult {
4458    fn take(&mut self) -> Option<Option<ValRaw>> {
4459        match mem::replace(self, SyncResult::Taken) {
4460            SyncResult::NotProduced => None,
4461            SyncResult::Produced(val) => Some(val),
4462            SyncResult::Taken => {
4463                panic!("attempted to take a synchronous result that was already taken")
4464            }
4465        }
4466    }
4467}
4468
4469#[derive(Debug)]
4470enum HostFutureState {
4471    NotApplicable,
4472    Live,
4473    Dropped,
4474}
4475
4476/// Represents a pending guest task.
4477pub(crate) struct GuestTask {
4478    /// See `WaitableCommon`
4479    common: WaitableCommon,
4480    /// Closure to lower the parameters passed to this task.
4481    lower_params: Option<RawLower>,
4482    /// See `LiftResult`
4483    lift_result: Option<LiftResult>,
4484    /// A place to stash the type-erased lifted result if it can't be delivered
4485    /// immediately.
4486    result: Option<LiftedResult>,
4487    /// Closure to call the callback function for an async-lifted export, if
4488    /// provided.
4489    callback: Option<CallbackFn>,
4490    /// See `Caller`
4491    caller: Caller,
4492    /// A place to stash the call context for managing resource borrows while
4493    /// switching between guest tasks.
4494    call_context: Option<CallContext>,
4495    /// A place to stash the lowered result for a sync-to-async call until it
4496    /// can be returned to the caller.
4497    sync_result: SyncResult,
4498    /// Whether or not the task has been cancelled (i.e. whether the task is
4499    /// permitted to call `task.cancel`).
4500    cancel_sent: bool,
4501    /// Whether or not we've sent a `Status::Starting` event to any current or
4502    /// future waiters for this waitable.
4503    starting_sent: bool,
4504    /// Pending guest subtasks created by this task (directly or indirectly).
4505    ///
4506    /// This is used to re-parent subtasks which are still running when their
4507    /// parent task is disposed.
4508    subtasks: HashSet<TableId<GuestTask>>,
4509    /// Scratch waitable set used to watch subtasks during synchronous calls.
4510    sync_call_set: TableId<WaitableSet>,
4511    /// The runtime instance to which the exported function for this guest task
4512    /// belongs.
4513    ///
4514    /// Note that the task may do a sync->sync call via a fused adapter which
4515    /// results in that task executing code in a different instance, and it may
4516    /// call host functions and intrinsics from that other instance.
4517    instance: RuntimeInstance,
4518    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4519    /// delivered to this task.
4520    event: Option<Event>,
4521    /// The `ExportIndex` of the guest function being called, if known.
4522    function_index: Option<ExportIndex>,
4523    /// Whether or not the task has exited.
4524    exited: bool,
4525    /// Threads belonging to this task
4526    threads: HashSet<TableId<GuestThread>>,
4527    /// The state of the host future that represents an async task, which must
4528    /// be dropped before we can delete the task.
4529    host_future_state: HostFutureState,
4530    /// Indicates whether this task was created for a call to an async-lifted
4531    /// export.
4532    async_function: bool,
4533}
4534
4535impl GuestTask {
4536    fn already_lowered_parameters(&self) -> bool {
4537        // We reset `lower_params` after we lower the parameters
4538        self.lower_params.is_none()
4539    }
4540
4541    fn returned_or_cancelled(&self) -> bool {
4542        // We reset `lift_result` after we return or exit
4543        self.lift_result.is_none()
4544    }
4545
4546    fn ready_to_delete(&self) -> bool {
4547        let threads_completed = self.threads.is_empty();
4548        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4549        let pending_completion_event = matches!(
4550            self.common.event,
4551            Some(Event::Subtask {
4552                status: Status::Returned | Status::ReturnCancelled
4553            })
4554        );
4555        let ready = threads_completed
4556            && !has_sync_result
4557            && !pending_completion_event
4558            && !matches!(self.host_future_state, HostFutureState::Live);
4559        log::trace!(
4560            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4561            threads_completed,
4562            has_sync_result,
4563            pending_completion_event,
4564            self.host_future_state
4565        );
4566        ready
4567    }
4568
4569    fn new(
4570        state: &mut ConcurrentState,
4571        lower_params: RawLower,
4572        lift_result: LiftResult,
4573        caller: Caller,
4574        callback: Option<CallbackFn>,
4575        instance: RuntimeInstance,
4576        async_function: bool,
4577    ) -> Result<Self> {
4578        let sync_call_set = state.push(WaitableSet::default())?;
4579        let host_future_state = match &caller {
4580            Caller::Guest { .. } => HostFutureState::NotApplicable,
4581            Caller::Host {
4582                host_future_present,
4583                ..
4584            } => {
4585                if *host_future_present {
4586                    HostFutureState::Live
4587                } else {
4588                    HostFutureState::NotApplicable
4589                }
4590            }
4591        };
4592        Ok(Self {
4593            common: WaitableCommon::default(),
4594            lower_params: Some(lower_params),
4595            lift_result: Some(lift_result),
4596            result: None,
4597            callback,
4598            caller,
4599            call_context: Some(CallContext::default()),
4600            sync_result: SyncResult::NotProduced,
4601            cancel_sent: false,
4602            starting_sent: false,
4603            subtasks: HashSet::new(),
4604            sync_call_set,
4605            instance,
4606            event: None,
4607            function_index: None,
4608            exited: false,
4609            threads: HashSet::new(),
4610            host_future_state,
4611            async_function,
4612        })
4613    }
4614
4615    /// Dispose of this guest task, reparenting any pending subtasks to the
4616    /// caller.
4617    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4618        // If there are not-yet-delivered completion events for subtasks in
4619        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4620        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4621            if let Some(Event::Subtask {
4622                status: Status::Returned | Status::ReturnCancelled,
4623            }) = waitable.common(state)?.event
4624            {
4625                waitable.delete_from(state)?;
4626            }
4627        }
4628
4629        assert!(self.threads.is_empty());
4630
4631        state.delete(self.sync_call_set)?;
4632
4633        // Reparent any pending subtasks to the caller.
4634        match &self.caller {
4635            Caller::Guest { thread } => {
4636                let task_mut = state.get_mut(thread.task)?;
4637                let present = task_mut.subtasks.remove(&me);
4638                assert!(present);
4639
4640                for subtask in &self.subtasks {
4641                    task_mut.subtasks.insert(*subtask);
4642                }
4643
4644                for subtask in &self.subtasks {
4645                    state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4646                }
4647            }
4648            Caller::Host {
4649                exit_tx, caller, ..
4650            } => {
4651                for subtask in &self.subtasks {
4652                    state.get_mut(*subtask)?.caller = Caller::Host {
4653                        tx: None,
4654                        // Clone `exit_tx` to ensure that it is only dropped
4655                        // once all transitive subtasks of the host call have
4656                        // exited:
4657                        exit_tx: exit_tx.clone(),
4658                        host_future_present: false,
4659                        call_post_return_automatically: true,
4660                        caller: *caller,
4661                    };
4662                }
4663            }
4664        }
4665
4666        for subtask in self.subtasks {
4667            let task = state.get_mut(subtask)?;
4668            if task.exited && task.ready_to_delete() {
4669                Waitable::Guest(subtask).delete_from(state)?;
4670            }
4671        }
4672
4673        Ok(())
4674    }
4675
4676    fn call_post_return_automatically(&self) -> bool {
4677        matches!(
4678            self.caller,
4679            Caller::Guest { .. }
4680                | Caller::Host {
4681                    call_post_return_automatically: true,
4682                    ..
4683                }
4684        )
4685    }
4686}
4687
4688impl TableDebug for GuestTask {
4689    fn type_name() -> &'static str {
4690        "GuestTask"
4691    }
4692}
4693
4694/// Represents state common to all kinds of waitables.
4695#[derive(Default)]
4696struct WaitableCommon {
4697    /// The currently pending event for this waitable, if any.
4698    event: Option<Event>,
4699    /// The set to which this waitable belongs, if any.
4700    set: Option<TableId<WaitableSet>>,
4701    /// The handle with which the guest refers to this waitable, if any.
4702    handle: Option<u32>,
4703}
4704
4705/// Represents a Component Model Async `waitable`.
4706#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4707enum Waitable {
4708    /// A host task
4709    Host(TableId<HostTask>),
4710    /// A guest task
4711    Guest(TableId<GuestTask>),
4712    /// The read or write end of a stream or future
4713    Transmit(TableId<TransmitHandle>),
4714}
4715
4716impl Waitable {
4717    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4718    /// handle.
4719    fn from_instance(
4720        state: Pin<&mut ComponentInstance>,
4721        caller_instance: RuntimeComponentInstanceIndex,
4722        waitable: u32,
4723    ) -> Result<Self> {
4724        use crate::runtime::vm::component::Waitable;
4725
4726        let (waitable, kind) = state.instance_states().0[caller_instance]
4727            .handle_table()
4728            .waitable_rep(waitable)?;
4729
4730        Ok(match kind {
4731            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4732            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4733            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4734        })
4735    }
4736
4737    /// Retrieve the host-visible identifier for this `Waitable`.
4738    fn rep(&self) -> u32 {
4739        match self {
4740            Self::Host(id) => id.rep(),
4741            Self::Guest(id) => id.rep(),
4742            Self::Transmit(id) => id.rep(),
4743        }
4744    }
4745
4746    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4747    /// remove it from any set it may currently belong to (when `set` is
4748    /// `None`).
4749    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4750        log::trace!("waitable {self:?} join set {set:?}",);
4751
4752        let old = mem::replace(&mut self.common(state)?.set, set);
4753
4754        if let Some(old) = old {
4755            match *self {
4756                Waitable::Host(id) => state.remove_child(id, old),
4757                Waitable::Guest(id) => state.remove_child(id, old),
4758                Waitable::Transmit(id) => state.remove_child(id, old),
4759            }?;
4760
4761            state.get_mut(old)?.ready.remove(self);
4762        }
4763
4764        if let Some(set) = set {
4765            match *self {
4766                Waitable::Host(id) => state.add_child(id, set),
4767                Waitable::Guest(id) => state.add_child(id, set),
4768                Waitable::Transmit(id) => state.add_child(id, set),
4769            }?;
4770
4771            if self.common(state)?.event.is_some() {
4772                self.mark_ready(state)?;
4773            }
4774        }
4775
4776        Ok(())
4777    }
4778
4779    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4780    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4781        Ok(match self {
4782            Self::Host(id) => &mut state.get_mut(*id)?.common,
4783            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4784            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4785        })
4786    }
4787
4788    /// Set or clear the pending event for this waitable and either deliver it
4789    /// to the first waiter, if any, or mark it as ready to be delivered to the
4790    /// next waiter that arrives.
4791    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4792        log::trace!("set event for {self:?}: {event:?}");
4793        self.common(state)?.event = event;
4794        self.mark_ready(state)
4795    }
4796
4797    /// Take the pending event from this waitable, leaving `None` in its place.
4798    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4799        let common = self.common(state)?;
4800        let event = common.event.take();
4801        if let Some(set) = self.common(state)?.set {
4802            state.get_mut(set)?.ready.remove(self);
4803        }
4804
4805        Ok(event)
4806    }
4807
4808    /// Deliver the current event for this waitable to the first waiter, if any,
4809    /// or else mark it as ready to be delivered to the next waiter that
4810    /// arrives.
4811    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4812        if let Some(set) = self.common(state)?.set {
4813            state.get_mut(set)?.ready.insert(*self);
4814            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4815                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4816                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4817
4818                let item = match mode {
4819                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4820                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4821                        thread,
4822                        kind: GuestCallKind::DeliverEvent {
4823                            instance,
4824                            set: Some(set),
4825                        },
4826                    }),
4827                };
4828                state.push_high_priority(item);
4829            }
4830        }
4831        Ok(())
4832    }
4833
4834    /// Remove this waitable from the instance's rep table.
4835    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4836        match self {
4837            Self::Host(task) => {
4838                log::trace!("delete host task {task:?}");
4839                state.delete(*task)?;
4840            }
4841            Self::Guest(task) => {
4842                log::trace!("delete guest task {task:?}");
4843                state.delete(*task)?.dispose(state, *task)?;
4844            }
4845            Self::Transmit(task) => {
4846                state.delete(*task)?;
4847            }
4848        }
4849
4850        Ok(())
4851    }
4852}
4853
4854impl fmt::Debug for Waitable {
4855    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4856        match self {
4857            Self::Host(id) => write!(f, "{id:?}"),
4858            Self::Guest(id) => write!(f, "{id:?}"),
4859            Self::Transmit(id) => write!(f, "{id:?}"),
4860        }
4861    }
4862}
4863
4864/// Represents a Component Model Async `waitable-set`.
4865#[derive(Default)]
4866struct WaitableSet {
4867    /// Which waitables in this set have pending events, if any.
4868    ready: BTreeSet<Waitable>,
4869    /// Which guest threads are currently waiting on this set, if any.
4870    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4871}
4872
4873impl TableDebug for WaitableSet {
4874    fn type_name() -> &'static str {
4875        "WaitableSet"
4876    }
4877}
4878
4879/// Type-erased closure to lower the parameters for a guest task.
4880type RawLower =
4881    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4882
4883/// Type-erased closure to lift the result for a guest task.
4884type RawLift = Box<
4885    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4886>;
4887
4888/// Type erased result of a guest task which may be downcast to the expected
4889/// type by a host caller (or simply ignored in the case of a guest caller; see
4890/// `DummyResult`).
4891type LiftedResult = Box<dyn Any + Send + Sync>;
4892
4893/// Used to return a result from a `LiftFn` when the actual result has already
4894/// been lowered to a guest task's stack and linear memory.
4895struct DummyResult;
4896
4897/// Represents the Component Model Async state of a (sub-)component instance.
4898#[derive(Default)]
4899pub struct ConcurrentInstanceState {
4900    /// Whether backpressure is set for this instance (enabled if >0)
4901    backpressure: u16,
4902    /// Whether this instance can be entered
4903    do_not_enter: bool,
4904    /// Pending calls for this instance which require `Self::backpressure` to be
4905    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4906    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4907}
4908
4909impl ConcurrentInstanceState {
4910    pub fn pending_is_empty(&self) -> bool {
4911        self.pending.is_empty()
4912    }
4913}
4914
4915/// Represents the Component Model Async state of a store.
4916pub struct ConcurrentState {
4917    /// The currently running guest thread, if any.
4918    guest_thread: Option<QualifiedThreadId>,
4919
4920    /// The set of pending host and background tasks, if any.
4921    ///
4922    /// See `ComponentInstance::poll_until` for where we temporarily take this
4923    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4924    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4925    /// The table of waitables, waitable sets, etc.
4926    table: AlwaysMut<ResourceTable>,
4927    /// The "high priority" work queue for this store's event loop.
4928    high_priority: Vec<WorkItem>,
4929    /// The "low priority" work queue for this store's event loop.
4930    low_priority: VecDeque<WorkItem>,
4931    /// A place to stash the reason a fiber is suspending so that the code which
4932    /// resumed it will know under what conditions the fiber should be resumed
4933    /// again.
4934    suspend_reason: Option<SuspendReason>,
4935    /// A cached fiber which is waiting for work to do.
4936    ///
4937    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4938    worker: Option<StoreFiber<'static>>,
4939    /// A place to stash the work item for which we're resuming a worker fiber.
4940    worker_item: Option<WorkerItem>,
4941
4942    /// Reference counts for all component error contexts
4943    ///
4944    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4945    /// (sub)component ref counts as tracked by `error_context_tables`, for
4946    /// example when the host holds one or more references to error contexts.
4947    ///
4948    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4949    /// component-wide representation) of the index into concurrent state for a given
4950    /// stored `ErrorContext`.
4951    ///
4952    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4953    /// as a `TableId<ErrorContextState>`.
4954    global_error_context_ref_counts:
4955        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4956}
4957
4958impl Default for ConcurrentState {
4959    fn default() -> Self {
4960        Self {
4961            guest_thread: None,
4962            table: AlwaysMut::new(ResourceTable::new()),
4963            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4964            high_priority: Vec::new(),
4965            low_priority: VecDeque::new(),
4966            suspend_reason: None,
4967            worker: None,
4968            worker_item: None,
4969            global_error_context_ref_counts: BTreeMap::new(),
4970        }
4971    }
4972}
4973
4974impl ConcurrentState {
4975    /// Take ownership of any fibers and futures owned by this object.
4976    ///
4977    /// This should be used when disposing of the `Store` containing this object
4978    /// in order to gracefully resolve any and all fibers using
4979    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4980    /// use-after-free bugs due to fibers which may still have access to the
4981    /// `Store`.
4982    ///
4983    /// Additionally, the futures collected with this function should be dropped
4984    /// within a `tls::set` call, which will ensure than any futures closing
4985    /// over an `&Accessor` will have access to the store when dropped, allowing
4986    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4987    /// panicking.
4988    ///
4989    /// Note that this will leave the object in an inconsistent and unusable
4990    /// state, so it should only be used just prior to dropping it.
4991    pub(crate) fn take_fibers_and_futures(
4992        &mut self,
4993        fibers: &mut Vec<StoreFiber<'static>>,
4994        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4995    ) {
4996        for entry in self.table.get_mut().iter_mut() {
4997            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4998                for mode in mem::take(&mut set.waiting).into_values() {
4999                    if let WaitMode::Fiber(fiber) = mode {
5000                        fibers.push(fiber);
5001                    }
5002                }
5003            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5004                if let GuestThreadState::Suspended(fiber) =
5005                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5006                {
5007                    fibers.push(fiber);
5008                }
5009            }
5010        }
5011
5012        if let Some(fiber) = self.worker.take() {
5013            fibers.push(fiber);
5014        }
5015
5016        let mut handle_item = |item| match item {
5017            WorkItem::ResumeFiber(fiber) => {
5018                fibers.push(fiber);
5019            }
5020            WorkItem::PushFuture(future) => {
5021                self.futures
5022                    .get_mut()
5023                    .as_mut()
5024                    .unwrap()
5025                    .push(future.into_inner());
5026            }
5027            _ => {}
5028        };
5029
5030        for item in mem::take(&mut self.high_priority) {
5031            handle_item(item);
5032        }
5033        for item in mem::take(&mut self.low_priority) {
5034            handle_item(item);
5035        }
5036
5037        if let Some(them) = self.futures.get_mut().take() {
5038            futures.push(them);
5039        }
5040    }
5041
5042    /// Collect the next set of work items to run. This will be either all
5043    /// high-priority items, or a single low-priority item if there are no
5044    /// high-priority items.
5045    fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
5046        let mut ready = mem::take(&mut self.high_priority);
5047        if ready.is_empty() {
5048            if let Some(item) = self.low_priority.pop_back() {
5049                ready.push(item);
5050            }
5051        }
5052        ready
5053    }
5054
5055    fn push<V: Send + Sync + 'static>(
5056        &mut self,
5057        value: V,
5058    ) -> Result<TableId<V>, ResourceTableError> {
5059        self.table.get_mut().push(value).map(TableId::from)
5060    }
5061
5062    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5063        self.table.get_mut().get_mut(&Resource::from(id))
5064    }
5065
5066    pub fn add_child<T: 'static, U: 'static>(
5067        &mut self,
5068        child: TableId<T>,
5069        parent: TableId<U>,
5070    ) -> Result<(), ResourceTableError> {
5071        self.table
5072            .get_mut()
5073            .add_child(Resource::from(child), Resource::from(parent))
5074    }
5075
5076    pub fn remove_child<T: 'static, U: 'static>(
5077        &mut self,
5078        child: TableId<T>,
5079        parent: TableId<U>,
5080    ) -> Result<(), ResourceTableError> {
5081        self.table
5082            .get_mut()
5083            .remove_child(Resource::from(child), Resource::from(parent))
5084    }
5085
5086    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5087        self.table.get_mut().delete(Resource::from(id))
5088    }
5089
5090    fn push_future(&mut self, future: HostTaskFuture) {
5091        // Note that we can't directly push to `ConcurrentState::futures` here
5092        // since this may be called from a future that's being polled inside
5093        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5094        // so it has exclusive access while polling it.  Therefore, we push a
5095        // work item to the "high priority" queue, which will actually push to
5096        // `ConcurrentState::futures` later.
5097        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5098    }
5099
5100    fn push_high_priority(&mut self, item: WorkItem) {
5101        log::trace!("push high priority: {item:?}");
5102        self.high_priority.push(item);
5103    }
5104
5105    fn push_low_priority(&mut self, item: WorkItem) {
5106        log::trace!("push low priority: {item:?}");
5107        self.low_priority.push_front(item);
5108    }
5109
5110    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5111        if high_priority {
5112            self.push_high_priority(item);
5113        } else {
5114            self.push_low_priority(item);
5115        }
5116    }
5117
5118    /// Implements the `context.get` intrinsic.
5119    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5120        let thread = self.guest_thread.unwrap();
5121        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5122        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5123        Ok(val)
5124    }
5125
5126    /// Implements the `context.set` intrinsic.
5127    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5128        let thread = self.guest_thread.unwrap();
5129        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5130        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5131        Ok(())
5132    }
5133
5134    /// Returns whether there's a pending cancellation on the current guest thread,
5135    /// consuming the event if so.
5136    fn take_pending_cancellation(&mut self) -> bool {
5137        let thread = self.guest_thread.unwrap();
5138        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5139            assert!(matches!(event, Event::Cancelled));
5140            true
5141        } else {
5142            false
5143        }
5144    }
5145
5146    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5147        if self.may_block(task) {
5148            Ok(())
5149        } else {
5150            Err(Trap::CannotBlockSyncTask.into())
5151        }
5152    }
5153
5154    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5155        let task = self.get_mut(task).unwrap();
5156        task.async_function || task.returned_or_cancelled()
5157    }
5158}
5159
5160/// Provide a type hint to compiler about the shape of a parameter lower
5161/// closure.
5162fn for_any_lower<
5163    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5164>(
5165    fun: F,
5166) -> F {
5167    fun
5168}
5169
5170/// Provide a type hint to compiler about the shape of a result lift closure.
5171fn for_any_lift<
5172    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5173>(
5174    fun: F,
5175) -> F {
5176    fun
5177}
5178
5179/// Wrap the specified future in a `poll_fn` which asserts that the future is
5180/// only polled from the event loop of the specified `Store`.
5181///
5182/// See `StoreContextMut::run_concurrent` for details.
5183fn checked<F: Future + Send + 'static>(
5184    id: StoreId,
5185    fut: F,
5186) -> impl Future<Output = F::Output> + Send + 'static {
5187    async move {
5188        let mut fut = pin!(fut);
5189        future::poll_fn(move |cx| {
5190            let message = "\
5191                `Future`s which depend on asynchronous component tasks, streams, or \
5192                futures to complete may only be polled from the event loop of the \
5193                store to which they belong.  Please use \
5194                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5195            ";
5196            tls::try_get(|store| {
5197                let matched = match store {
5198                    tls::TryGet::Some(store) => store.id() == id,
5199                    tls::TryGet::Taken | tls::TryGet::None => false,
5200                };
5201
5202                if !matched {
5203                    panic!("{message}")
5204                }
5205            });
5206            fut.as_mut().poll(cx)
5207        })
5208        .await
5209    }
5210}
5211
5212/// Assert that `StoreContextMut::run_concurrent` has not been called from
5213/// within an store's event loop.
5214fn check_recursive_run() {
5215    tls::try_get(|store| {
5216        if !matches!(store, tls::TryGet::None) {
5217            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5218        }
5219    });
5220}
5221
5222fn unpack_callback_code(code: u32) -> (u32, u32) {
5223    (code & 0xF, code >> 4)
5224}
5225
5226/// Helper struct for packaging parameters to be passed to
5227/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5228/// `waitable-set.poll`.
5229struct WaitableCheckParams {
5230    set: TableId<WaitableSet>,
5231    options: OptionsIndex,
5232    payload: u32,
5233}
5234
5235/// Indicates whether `ComponentInstance::waitable_check` is being called for
5236/// `waitable-set.wait` or `waitable-set.poll`.
5237enum WaitableCheck {
5238    Wait,
5239    Poll,
5240}
5241
5242/// Represents a guest task called from the host, prepared using `prepare_call`.
5243pub(crate) struct PreparedCall<R> {
5244    /// The guest export to be called
5245    handle: Func,
5246    /// The guest thread created by `prepare_call`
5247    thread: QualifiedThreadId,
5248    /// The number of lowered core Wasm parameters to pass to the call.
5249    param_count: usize,
5250    /// The `oneshot::Receiver` to which the result of the call will be
5251    /// delivered when it is available.
5252    rx: oneshot::Receiver<LiftedResult>,
5253    /// The `oneshot::Receiver` which will resolve when the task -- and any
5254    /// transitive subtasks -- have all exited.
5255    exit_rx: oneshot::Receiver<()>,
5256    _phantom: PhantomData<R>,
5257}
5258
5259impl<R> PreparedCall<R> {
5260    /// Get a copy of the `TaskId` for this `PreparedCall`.
5261    pub(crate) fn task_id(&self) -> TaskId {
5262        TaskId {
5263            task: self.thread.task,
5264        }
5265    }
5266}
5267
5268/// Represents a task created by `prepare_call`.
5269pub(crate) struct TaskId {
5270    task: TableId<GuestTask>,
5271}
5272
5273impl TaskId {
5274    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5275    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5276    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5277    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5278    /// and delete the task when all threads are done.
5279    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5280        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5281        if !task.already_lowered_parameters() {
5282            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5283        } else {
5284            task.host_future_state = HostFutureState::Dropped;
5285            if task.ready_to_delete() {
5286                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5287            }
5288        }
5289        Ok(())
5290    }
5291}
5292
5293/// Prepare a call to the specified exported Wasm function, providing functions
5294/// for lowering the parameters and lifting the result.
5295///
5296/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5297/// loop, use `queue_call`.
5298pub(crate) fn prepare_call<T, R>(
5299    mut store: StoreContextMut<T>,
5300    handle: Func,
5301    param_count: usize,
5302    host_future_present: bool,
5303    call_post_return_automatically: bool,
5304    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5305    + Send
5306    + Sync
5307    + 'static,
5308    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5309    + Send
5310    + Sync
5311    + 'static,
5312) -> Result<PreparedCall<R>> {
5313    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5314
5315    let instance = handle.instance().id().get(store.0);
5316    let options = &instance.component().env_component().options[options];
5317    let ty = &instance.component().types()[ty];
5318    let async_function = ty.async_;
5319    let task_return_type = ty.results;
5320    let component_instance = raw_options.instance;
5321    let callback = options.callback.map(|i| instance.runtime_callback(i));
5322    let memory = options
5323        .memory()
5324        .map(|i| instance.runtime_memory(i))
5325        .map(SendSyncPtr::new);
5326    let string_encoding = options.string_encoding;
5327    let token = StoreToken::new(store.as_context_mut());
5328    let state = store.0.concurrent_state_mut();
5329
5330    let (tx, rx) = oneshot::channel();
5331    let (exit_tx, exit_rx) = oneshot::channel();
5332
5333    let caller = state.guest_thread;
5334    let mut task = GuestTask::new(
5335        state,
5336        Box::new(for_any_lower(move |store, params| {
5337            lower_params(handle, token.as_context_mut(store), params)
5338        })),
5339        LiftResult {
5340            lift: Box::new(for_any_lift(move |store, result| {
5341                lift_result(handle, store, result)
5342            })),
5343            ty: task_return_type,
5344            memory,
5345            string_encoding,
5346        },
5347        Caller::Host {
5348            tx: Some(tx),
5349            exit_tx: Arc::new(exit_tx),
5350            host_future_present,
5351            call_post_return_automatically,
5352            caller,
5353        },
5354        callback.map(|callback| {
5355            let callback = SendSyncPtr::new(callback);
5356            let instance = handle.instance();
5357            Box::new(move |store: &mut dyn VMStore, event, handle| {
5358                let store = token.as_context_mut(store);
5359                // SAFETY: Per the contract of `prepare_call`, the callback
5360                // will remain valid at least as long is this task exists.
5361                unsafe { instance.call_callback(store, callback, event, handle) }
5362            }) as CallbackFn
5363        }),
5364        RuntimeInstance {
5365            instance: handle.instance().id().instance(),
5366            index: component_instance,
5367        },
5368        async_function,
5369    )?;
5370    task.function_index = Some(handle.index());
5371
5372    let task = state.push(task)?;
5373    let thread = state.push(GuestThread::new_implicit(task))?;
5374    state.get_mut(task)?.threads.insert(thread);
5375
5376    if !store.0.may_enter_task(task) {
5377        bail!(crate::Trap::CannotEnterComponent);
5378    }
5379
5380    Ok(PreparedCall {
5381        handle,
5382        thread: QualifiedThreadId { task, thread },
5383        param_count,
5384        rx,
5385        exit_rx,
5386        _phantom: PhantomData,
5387    })
5388}
5389
5390/// Queue a call previously prepared using `prepare_call` to be run as part of
5391/// the associated `ComponentInstance`'s event loop.
5392///
5393/// The returned future will resolve to the result once it is available, but
5394/// must only be polled via the instance's event loop. See
5395/// `StoreContextMut::run_concurrent` for details.
5396pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5397    mut store: StoreContextMut<T>,
5398    prepared: PreparedCall<R>,
5399) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5400    let PreparedCall {
5401        handle,
5402        thread,
5403        param_count,
5404        rx,
5405        exit_rx,
5406        ..
5407    } = prepared;
5408
5409    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5410
5411    Ok(checked(
5412        store.0.id(),
5413        rx.map(move |result| {
5414            result
5415                .map(|v| (*v.downcast().unwrap(), exit_rx))
5416                .map_err(crate::Error::from)
5417        }),
5418    ))
5419}
5420
5421/// Queue a call previously prepared using `prepare_call` to be run as part of
5422/// the associated `ComponentInstance`'s event loop.
5423fn queue_call0<T: 'static>(
5424    store: StoreContextMut<T>,
5425    handle: Func,
5426    guest_thread: QualifiedThreadId,
5427    param_count: usize,
5428) -> Result<()> {
5429    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5430    let is_concurrent = raw_options.async_;
5431    let callback = raw_options.callback;
5432    let instance = handle.instance();
5433    let callee = handle.lifted_core_func(store.0);
5434    let post_return = handle.post_return_core_func(store.0);
5435    let callback = callback.map(|i| {
5436        let instance = instance.id().get(store.0);
5437        SendSyncPtr::new(instance.runtime_callback(i))
5438    });
5439
5440    log::trace!("queueing call {guest_thread:?}");
5441
5442    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5443    // (with signatures appropriate for this call) and will remain valid as
5444    // long as this instance is valid.
5445    unsafe {
5446        instance.queue_call(
5447            store,
5448            guest_thread,
5449            SendSyncPtr::new(callee),
5450            param_count,
5451            1,
5452            is_concurrent,
5453            callback,
5454            post_return.map(SendSyncPtr::new),
5455        )
5456    }
5457}