wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108/// Constant defined in the Component Model spec to indicate that the async
109/// intrinsic (e.g. `future.write`) has not yet completed.
110const BLOCKED: u32 = 0xffff_ffff;
111
112/// Corresponds to `CallState` in the upstream spec.
113#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115    Starting = 0,
116    Started = 1,
117    Returned = 2,
118    StartCancelled = 3,
119    ReturnCancelled = 4,
120}
121
122impl Status {
123    /// Packs this status and the optional `waitable` provided into a 32-bit
124    /// result that the canonical ABI requires.
125    ///
126    /// The low 4 bits are reserved for the status while the upper 28 bits are
127    /// the waitable, if present.
128    pub fn pack(self, waitable: Option<u32>) -> u32 {
129        assert!(matches!(self, Status::Returned) == waitable.is_none());
130        let waitable = waitable.unwrap_or(0);
131        assert!(waitable < (1 << 28));
132        (waitable << 4) | (self as u32)
133    }
134}
135
136/// Corresponds to `EventCode` in the Component Model spec, plus related payload
137/// data.
138#[derive(Clone, Copy, Debug)]
139enum Event {
140    None,
141    Cancelled,
142    Subtask {
143        status: Status,
144    },
145    StreamRead {
146        code: ReturnCode,
147        pending: Option<(TypeStreamTableIndex, u32)>,
148    },
149    StreamWrite {
150        code: ReturnCode,
151        pending: Option<(TypeStreamTableIndex, u32)>,
152    },
153    FutureRead {
154        code: ReturnCode,
155        pending: Option<(TypeFutureTableIndex, u32)>,
156    },
157    FutureWrite {
158        code: ReturnCode,
159        pending: Option<(TypeFutureTableIndex, u32)>,
160    },
161}
162
163impl Event {
164    /// Lower this event to core Wasm integers for delivery to the guest.
165    ///
166    /// Note that the waitable handle, if any, is assumed to be lowered
167    /// separately.
168    fn parts(self) -> (u32, u32) {
169        const EVENT_NONE: u32 = 0;
170        const EVENT_SUBTASK: u32 = 1;
171        const EVENT_STREAM_READ: u32 = 2;
172        const EVENT_STREAM_WRITE: u32 = 3;
173        const EVENT_FUTURE_READ: u32 = 4;
174        const EVENT_FUTURE_WRITE: u32 = 5;
175        const EVENT_CANCELLED: u32 = 6;
176        match self {
177            Event::None => (EVENT_NONE, 0),
178            Event::Cancelled => (EVENT_CANCELLED, 0),
179            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184        }
185    }
186}
187
188/// Corresponds to `CallbackCode` in the spec.
189mod callback_code {
190    pub const EXIT: u32 = 0;
191    pub const YIELD: u32 = 1;
192    pub const WAIT: u32 = 2;
193    pub const POLL: u32 = 3;
194}
195
196/// A flag indicating that the callee is an async-lowered export.
197///
198/// This may be passed to the `async-start` intrinsic from a fused adapter.
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201/// Provides access to either store data (via the `get` method) or the store
202/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
203/// instance to which the current host task belongs.
204///
205/// See [`Accessor::with`] for details.
206pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207    store: StoreContextMut<'a, T>,
208    get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213    D: HasData + ?Sized,
214    T: 'static,
215{
216    /// Creates a new [`Access`] from its component parts.
217    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218        Self { store, get_data }
219    }
220
221    /// Get mutable access to the store data.
222    pub fn data_mut(&mut self) -> &mut T {
223        self.store.data_mut()
224    }
225
226    /// Get mutable access to the store data.
227    pub fn get(&mut self) -> D::Data<'_> {
228        (self.get_data)(self.data_mut())
229    }
230
231    /// Spawn a background task.
232    ///
233    /// See [`Accessor::spawn`] for details.
234    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235    where
236        T: 'static,
237    {
238        let accessor = Accessor {
239            get_data: self.get_data,
240            token: StoreToken::new(self.store.as_context_mut()),
241        };
242        self.store
243            .as_context_mut()
244            .spawn_with_accessor(accessor, task)
245    }
246}
247
248impl<'a, T, D> AsContext for Access<'a, T, D>
249where
250    D: HasData + ?Sized,
251    T: 'static,
252{
253    type Data = T;
254
255    fn as_context(&self) -> StoreContext<'_, T> {
256        self.store.as_context()
257    }
258}
259
260impl<'a, T, D> AsContextMut for Access<'a, T, D>
261where
262    D: HasData + ?Sized,
263    T: 'static,
264{
265    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
266        self.store.as_context_mut()
267    }
268}
269
270/// Provides scoped mutable access to store data in the context of a concurrent
271/// host task future.
272///
273/// This allows multiple host task futures to execute concurrently and access
274/// the store between (but not across) `await` points.
275///
276/// # Rationale
277///
278/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
279/// `D::Data<'_>`. The problem this is solving, however, is that it does not
280/// literally store these values. The basic problem is that when a concurrent
281/// host future is being polled it has access to `&mut T` (and the whole
282/// `Store`) but when it's not being polled it does not have access to these
283/// values. This reflects how the store is only ever polling one future at a
284/// time so the store is effectively being passed between futures.
285///
286/// Rust's `Future` trait, however, has no means of passing a `Store`
287/// temporarily between futures. The [`Context`](std::task::Context) type does
288/// not have the ability to attach arbitrary information to it at this time.
289/// This type, [`Accessor`], is used to bridge this expressivity gap.
290///
291/// The [`Accessor`] type here represents the ability to acquire, temporarily in
292/// a synchronous manner, the current store. The [`Accessor::with`] function
293/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
294/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
295/// not take an `async` closure as its argument, instead it's a synchronous
296/// closure which must complete during on run of `Future::poll`. This reflects
297/// how the store is temporarily made available while a host future is being
298/// polled.
299///
300/// # Implementation
301///
302/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
303/// this type additionally doesn't even have a lifetime parameter. This is
304/// instead a representation of proof of the ability to acquire these while a
305/// future is being polled. Wasmtime will, when it polls a host future,
306/// configure ambient state such that the `Accessor` that a future closes over
307/// will work and be able to access the store.
308///
309/// This has a number of implications for users such as:
310///
311/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
312///   the lifetime of a single future.
313/// * A future is expected to, however, close over an `Accessor` and keep it
314///   alive probably for the duration of the entire future.
315/// * Different host futures will be given different `Accessor`s, and that's
316///   intentional.
317/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
318///   alleviates some otherwise required bounds to be written down.
319///
320/// # Using `Accessor` in `Drop`
321///
322/// The methods on `Accessor` are only expected to work in the context of
323/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
324/// host future can be dropped at any time throughout the system and Wasmtime
325/// store context is not necessarily available at that time. It's recommended to
326/// not use `Accessor` methods in anything connected to a `Drop` implementation
327/// as they will panic and have unintended results. If you run into this though
328/// feel free to file an issue on the Wasmtime repository.
329pub struct Accessor<T: 'static, D = HasSelf<T>>
330where
331    D: HasData + ?Sized,
332{
333    token: StoreToken<T>,
334    get_data: fn(&mut T) -> D::Data<'_>,
335}
336
337/// A helper trait to take any type of accessor-with-data in functions.
338///
339/// This trait is similar to [`AsContextMut`] except that it's used when
340/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
341/// [`Accessor`] is the main type used in concurrent settings and is passed to
342/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
343///
344/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
345/// this trait. This effectively means that regardless of the `D` in
346/// `Accessor<T, D>` it can still be passed to a function which just needs a
347/// store accessor.
348///
349/// Acquiring an [`Accessor`] can be done through
350/// [`StoreContextMut::run_concurrent`] for example or in a host function
351/// through
352/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
353pub trait AsAccessor {
354    /// The `T` in `Store<T>` that this accessor refers to.
355    type Data: 'static;
356
357    /// The `D` in `Accessor<T, D>`, or the projection out of
358    /// `Self::Data`.
359    type AccessorData: HasData + ?Sized;
360
361    /// Returns the accessor that this is referring to.
362    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
363}
364
365impl<T: AsAccessor + ?Sized> AsAccessor for &T {
366    type Data = T::Data;
367    type AccessorData = T::AccessorData;
368
369    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
370        T::as_accessor(self)
371    }
372}
373
374impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
375    type Data = T;
376    type AccessorData = D;
377
378    fn as_accessor(&self) -> &Accessor<T, D> {
379        self
380    }
381}
382
383// Note that it is intentional at this time that `Accessor` does not actually
384// store `&mut T` or anything similar. This distinctly enables the `Accessor`
385// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
386// that matter). This is used to ergonomically simplify bindings where the
387// majority of the time `Accessor` is closed over in a future which then needs
388// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
389// you already have to write `T: 'static`...) it helps to avoid this.
390//
391// Note as well that `Accessor` doesn't actually store its data at all. Instead
392// it's more of a "proof" of what can be accessed from TLS. API design around
393// `Accessor` and functions like `Linker::func_wrap_concurrent` are
394// intentionally made to ensure that `Accessor` is ideally only used in the
395// context that TLS variables are actually set. For example host functions are
396// given `&Accessor`, not `Accessor`, and this prevents them from persisting
397// the value outside of a future. Within the future the TLS variables are all
398// guaranteed to be set while the future is being polled.
399//
400// Finally though this is not an ironclad guarantee, but nor does it need to be.
401// The TLS APIs are designed to panic or otherwise model usage where they're
402// called recursively or similar. It's hoped that code cannot be constructed to
403// actually hit this at runtime but this is not a safety requirement at this
404// time.
405const _: () = {
406    const fn assert<T: Send + Sync>() {}
407    assert::<Accessor<UnsafeCell<u32>>>();
408};
409
410impl<T> Accessor<T> {
411    /// Creates a new `Accessor` backed by the specified functions.
412    ///
413    /// - `get`: used to retrieve the store
414    ///
415    /// - `get_data`: used to "project" from the store's associated data to
416    /// another type (e.g. a field of that data or a wrapper around it).
417    ///
418    /// - `spawn`: used to queue spawned background tasks to be run later
419    pub(crate) fn new(token: StoreToken<T>) -> Self {
420        Self {
421            token,
422            get_data: |x| x,
423        }
424    }
425}
426
427impl<T, D> Accessor<T, D>
428where
429    D: HasData + ?Sized,
430{
431    /// Run the specified closure, passing it mutable access to the store.
432    ///
433    /// This function is one of the main building blocks of the [`Accessor`]
434    /// type. This yields synchronous, blocking, access to store via an
435    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
436    /// providing the ability to access `D` via [`Access::get`]. Note that the
437    /// `fun` here is given only temporary access to the store and `T`/`D`
438    /// meaning that the return value `R` here is not allowed to capture borrows
439    /// into the two. If access is needed to data within `T` or `D` outside of
440    /// this closure then it must be `clone`d out, for example.
441    ///
442    /// # Panics
443    ///
444    /// This function will panic if it is call recursively with any other
445    /// accessor already in scope. For example if `with` is called within `fun`,
446    /// then this function will panic. It is up to the embedder to ensure that
447    /// this does not happen.
448    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
449        tls::get(|vmstore| {
450            fun(Access {
451                store: self.token.as_context_mut(vmstore),
452                get_data: self.get_data,
453            })
454        })
455    }
456
457    /// Returns the getter this accessor is using to project from `T` into
458    /// `D::Data`.
459    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
460        self.get_data
461    }
462
463    /// Changes this accessor to access `D2` instead of the current type
464    /// parameter `D`.
465    ///
466    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
467    ///
468    /// # Panics
469    ///
470    /// When using this API the returned value is disconnected from `&self` and
471    /// the lifetime binding the `self` argument. An `Accessor` only works
472    /// within the context of the closure or async closure that it was
473    /// originally given to, however. This means that due to the fact that the
474    /// returned value has no lifetime connection it's possible to use the
475    /// accessor outside of `&self`, the original accessor, and panic.
476    ///
477    /// The returned value should only be used within the scope of the original
478    /// `Accessor` that `self` refers to.
479    pub fn with_getter<D2: HasData>(
480        &self,
481        get_data: fn(&mut T) -> D2::Data<'_>,
482    ) -> Accessor<T, D2> {
483        Accessor {
484            token: self.token,
485            get_data,
486        }
487    }
488
489    /// Spawn a background task which will receive an `&Accessor<T, D>` and
490    /// run concurrently with any other tasks in progress for the current
491    /// store.
492    ///
493    /// This is particularly useful for host functions which return a `stream`
494    /// or `future` such that the code to write to the write end of that
495    /// `stream` or `future` must run after the function returns.
496    ///
497    /// The returned [`JoinHandle`] may be used to cancel the task.
498    ///
499    /// # Panics
500    ///
501    /// Panics if called within a closure provided to the [`Accessor::with`]
502    /// function. This can only be called outside an active invocation of
503    /// [`Accessor::with`].
504    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
505    where
506        T: 'static,
507    {
508        let accessor = self.clone_for_spawn();
509        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
510    }
511
512    fn clone_for_spawn(&self) -> Self {
513        Self {
514            token: self.token,
515            get_data: self.get_data,
516        }
517    }
518}
519
520/// Represents a task which may be provided to `Accessor::spawn`,
521/// `Accessor::forward`, or `StorecContextMut::spawn`.
522// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
523// option.
524//
525// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
526// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
527// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
528// Send + Sync + 'static` fails with a type mismatch error when we try to pass
529// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
530// best we can do for the time being.
531pub trait AccessorTask<T, D, R>: Send + 'static
532where
533    D: HasData + ?Sized,
534{
535    /// Run the task.
536    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
537}
538
539/// Represents parameter and result metadata for the caller side of a
540/// guest->guest call orchestrated by a fused adapter.
541enum CallerInfo {
542    /// Metadata for a call to an async-lowered import
543    Async {
544        params: Vec<ValRaw>,
545        has_result: bool,
546    },
547    /// Metadata for a call to an sync-lowered import
548    Sync {
549        params: Vec<ValRaw>,
550        result_count: u32,
551    },
552}
553
554/// Indicates how a guest task is waiting on a waitable set.
555enum WaitMode {
556    /// The guest task is waiting using `task.wait`
557    Fiber(StoreFiber<'static>),
558    /// The guest task is waiting via a callback declared as part of an
559    /// async-lifted export.
560    Callback(Instance),
561}
562
563/// Represents the reason a fiber is suspending itself.
564#[derive(Debug)]
565enum SuspendReason {
566    /// The fiber is waiting for an event to be delivered to the specified
567    /// waitable set or task.
568    Waiting {
569        set: TableId<WaitableSet>,
570        thread: QualifiedThreadId,
571    },
572    /// The fiber has finished handling its most recent work item and is waiting
573    /// for another (or to be dropped if it is no longer needed).
574    NeedWork,
575    /// The fiber is yielding and should be resumed once other tasks have had a
576    /// chance to run.
577    Yielding { thread: QualifiedThreadId },
578    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
579    ExplicitlySuspending { thread: QualifiedThreadId },
580}
581
582/// Represents a pending call into guest code for a given guest task.
583enum GuestCallKind {
584    /// Indicates there's an event to deliver to the task, possibly related to a
585    /// waitable set the task has been waiting on or polling.
586    DeliverEvent {
587        /// The instance to which the task belongs.
588        instance: Instance,
589        /// The waitable set the event belongs to, if any.
590        ///
591        /// If this is `None` the event will be waiting in the
592        /// `GuestTask::event` field for the task.
593        set: Option<TableId<WaitableSet>>,
594    },
595    /// Indicates that a new guest task call is pending and may be executed
596    /// using the specified closure.
597    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
598    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
599}
600
601impl fmt::Debug for GuestCallKind {
602    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
603        match self {
604            Self::DeliverEvent { instance, set } => f
605                .debug_struct("DeliverEvent")
606                .field("instance", instance)
607                .field("set", set)
608                .finish(),
609            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
610            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
611        }
612    }
613}
614
615/// Represents a pending call into guest code for a given guest thread.
616#[derive(Debug)]
617struct GuestCall {
618    thread: QualifiedThreadId,
619    kind: GuestCallKind,
620}
621
622impl GuestCall {
623    /// Returns whether or not the call is ready to run.
624    ///
625    /// A call will not be ready to run if either:
626    ///
627    /// - the (sub-)component instance to be called has already been entered and
628    /// cannot be reentered until an in-progress call completes
629    ///
630    /// - the call is for a not-yet started task and the (sub-)component
631    /// instance to be called has backpressure enabled
632    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
633        let task_instance = state.get_mut(self.thread.task)?.instance;
634        let state = state.instance_state(task_instance);
635
636        let ready = match &self.kind {
637            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
638            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
639            GuestCallKind::StartExplicit(_) => true,
640        };
641        log::trace!(
642            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
643            state.do_not_enter,
644            state.backpressure
645        );
646        Ok(ready)
647    }
648}
649
650/// Job to be run on a worker fiber.
651enum WorkerItem {
652    GuestCall(GuestCall),
653    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
654}
655
656/// Represents state related to an in-progress poll operation (e.g. `task.poll`
657/// or `CallbackCode.POLL`).
658#[derive(Debug)]
659struct PollParams {
660    /// The instance to which the polling thread belongs.
661    instance: Instance,
662    /// The polling thread.
663    thread: QualifiedThreadId,
664    /// The waitable set being polled.
665    set: TableId<WaitableSet>,
666}
667
668/// Represents a pending work item to be handled by the event loop for a given
669/// component instance.
670enum WorkItem {
671    /// A host task to be pushed to `ConcurrentState::futures`.
672    PushFuture(AlwaysMut<HostTaskFuture>),
673    /// A fiber to resume.
674    ResumeFiber(StoreFiber<'static>),
675    /// A pending call into guest code for a given guest task.
676    GuestCall(GuestCall),
677    /// A pending `task.poll` or `CallbackCode.POLL` operation.
678    Poll(PollParams),
679    /// A job to run on a worker fiber.
680    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
681}
682
683impl fmt::Debug for WorkItem {
684    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
685        match self {
686            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
687            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
688            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
689            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
690            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
691        }
692    }
693}
694
695/// Whether a suspension intrinsic was cancelled or completed
696#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
697pub(crate) enum WaitResult {
698    Cancelled,
699    Completed,
700}
701
702/// Poll the specified future until it completes on behalf of a guest->host call
703/// using a sync-lowered import.
704///
705/// This is similar to `Instance::first_poll` except it's for sync-lowered
706/// imports, meaning we don't need to handle cancellation and we can block the
707/// caller until the task completes, at which point the caller can handle
708/// lowering the result to the guest's stack and linear memory.
709pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
710    store: &mut dyn VMStore,
711    future: impl Future<Output = Result<R>> + Send + 'static,
712    caller_instance: RuntimeComponentInstanceIndex,
713) -> Result<R> {
714    let state = store.concurrent_state_mut();
715
716    // If there is no current guest thread set, that means the host function was
717    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
718    // should complete immediately.
719    let Some(caller) = state.guest_thread else {
720        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
721            Poll::Ready(result) => result,
722            Poll::Pending => {
723                unreachable!()
724            }
725        };
726    };
727
728    // Save any existing result stashed in `GuestTask::result` so we can replace
729    // it with the new result.
730    let old_result = state
731        .get_mut(caller.task)
732        .with_context(|| format!("bad handle: {caller:?}"))?
733        .result
734        .take();
735
736    // Add a temporary host task into the table so we can track its progress.
737    // Note that we'll never allocate a waitable handle for the guest since
738    // we're being called synchronously.
739    let task = state.push(HostTask::new(caller_instance, None))?;
740
741    log::trace!("new host task child of {caller:?}: {task:?}");
742
743    // Wrap the future in a closure which will take care of stashing the result
744    // in `GuestTask::result` and resuming this fiber when the host task
745    // completes.
746    let mut future = Box::pin(async move {
747        let result = future.await?;
748        tls::get(move |store| {
749            let state = store.concurrent_state_mut();
750            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
751
752            Waitable::Host(task).set_event(
753                state,
754                Some(Event::Subtask {
755                    status: Status::Returned,
756                }),
757            )?;
758
759            Ok(())
760        })
761    }) as HostTaskFuture;
762
763    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
764    // add the future to `ConcurrentState::futures` and poll it automatically
765    // from the event loop if it doesn't complete immediately here.
766    let poll = tls::set(store, || {
767        future
768            .as_mut()
769            .poll(&mut Context::from_waker(&Waker::noop()))
770    });
771
772    match poll {
773        Poll::Ready(result) => {
774            // It completed immediately; check the result and delete the task.
775            result?;
776            log::trace!("delete host task {task:?} (already ready)");
777            store.concurrent_state_mut().delete(task)?;
778        }
779        Poll::Pending => {
780            // It did not complete immediately; add it to
781            // `ConcurrentState::futures` so it will be polled via the event
782            // loop; then use `GuestTask::sync_call_set` to wait for the task to
783            // complete, suspending the current fiber until it does so.
784            let state = store.concurrent_state_mut();
785            state.push_future(future);
786
787            let set = state.get_mut(caller.task)?.sync_call_set;
788            Waitable::Host(task).join(state, Some(set))?;
789
790            store.suspend(SuspendReason::Waiting {
791                set,
792                thread: caller,
793            })?;
794        }
795    }
796
797    // Retrieve and return the result.
798    Ok(*mem::replace(
799        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
800        old_result,
801    )
802    .unwrap()
803    .downcast()
804    .unwrap())
805}
806
807/// Execute the specified guest call.
808fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
809    match call.kind {
810        GuestCallKind::DeliverEvent { instance, set } => {
811            let (event, waitable) = instance
812                .get_event(store, call.thread.task, set, true)?
813                .unwrap();
814            let state = store.concurrent_state_mut();
815            let task = state.get_mut(call.thread.task)?;
816            let runtime_instance = task.instance;
817            let handle = waitable.map(|(_, v)| v).unwrap_or(0);
818
819            log::trace!(
820                "use callback to deliver event {event:?} to {:?} for {waitable:?}",
821                call.thread,
822            );
823
824            let old_thread = state.guest_thread.replace(call.thread);
825            log::trace!(
826                "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
827                call.thread
828            );
829
830            store.maybe_push_call_context(call.thread.task)?;
831
832            let state = store.concurrent_state_mut();
833            state.enter_instance(runtime_instance);
834
835            let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
836
837            let code = callback(store, runtime_instance, event, handle)?;
838
839            let state = store.concurrent_state_mut();
840
841            state.get_mut(call.thread.task)?.callback = Some(callback);
842            state.exit_instance(runtime_instance)?;
843
844            store.maybe_pop_call_context(call.thread.task)?;
845
846            instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
847
848            store.concurrent_state_mut().guest_thread = old_thread;
849            log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
850        }
851        GuestCallKind::StartImplicit(fun) => {
852            fun(store)?;
853        }
854        GuestCallKind::StartExplicit(fun) => {
855            fun(store)?;
856        }
857    }
858
859    Ok(())
860}
861
862impl<T> Store<T> {
863    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
864    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
865    where
866        T: Send + 'static,
867    {
868        self.as_context_mut().run_concurrent(fun).await
869    }
870
871    #[doc(hidden)]
872    pub fn assert_concurrent_state_empty(&mut self) {
873        self.as_context_mut().assert_concurrent_state_empty();
874    }
875
876    /// Convenience wrapper for [`StoreContextMut::spawn`].
877    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
878    where
879        T: 'static,
880    {
881        self.as_context_mut().spawn(task)
882    }
883}
884
885impl<T> StoreContextMut<'_, T> {
886    /// Assert that all the relevant tables and queues in the concurrent state
887    /// for this store are empty.
888    ///
889    /// This is for sanity checking in integration tests
890    /// (e.g. `component-async-tests`) that the relevant state has been cleared
891    /// after each test concludes.  This should help us catch leaks, e.g. guest
892    /// tasks which haven't been deleted despite having completed and having
893    /// been dropped by their supertasks.
894    #[doc(hidden)]
895    pub fn assert_concurrent_state_empty(self) {
896        let store = self.0;
897        store
898            .store_data_mut()
899            .components
900            .assert_guest_tables_empty();
901        let state = store.concurrent_state_mut();
902        assert!(
903            state.table.get_mut().is_empty(),
904            "non-empty table: {:?}",
905            state.table.get_mut()
906        );
907        assert!(state.high_priority.is_empty());
908        assert!(state.low_priority.is_empty());
909        assert!(state.guest_thread.is_none());
910        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
911        assert!(
912            state
913                .instance_states
914                .iter()
915                .all(|(_, state)| state.pending.is_empty())
916        );
917        assert!(state.global_error_context_ref_counts.is_empty());
918    }
919
920    /// Spawn a background task to run as part of this instance's event loop.
921    ///
922    /// The task will receive an `&Accessor<U>` and run concurrently with
923    /// any other tasks in progress for the instance.
924    ///
925    /// Note that the task will only make progress if and when the event loop
926    /// for this instance is run.
927    ///
928    /// The returned [`SpawnHandle`] may be used to cancel the task.
929    pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
930    where
931        T: 'static,
932    {
933        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
934        self.spawn_with_accessor(accessor, task)
935    }
936
937    /// Internal implementation of `spawn` functions where a `store` is
938    /// available along with an `Accessor`.
939    fn spawn_with_accessor<D>(
940        self,
941        accessor: Accessor<T, D>,
942        task: impl AccessorTask<T, D, Result<()>>,
943    ) -> JoinHandle
944    where
945        T: 'static,
946        D: HasData + ?Sized,
947    {
948        // Create an "abortable future" here where internally the future will
949        // hook calls to poll and possibly spawn more background tasks on each
950        // iteration.
951        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
952        self.0
953            .concurrent_state_mut()
954            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
955        handle
956    }
957
958    /// Run the specified closure `fun` to completion as part of this store's
959    /// event loop.
960    ///
961    /// This will run `fun` as part of this store's event loop until it
962    /// yields a result.  `fun` is provided an [`Accessor`], which provides
963    /// controlled access to the store and its data.
964    ///
965    /// This function can be used to invoke [`Func::call_concurrent`] for
966    /// example within the async closure provided here.
967    ///
968    /// # Store-blocking behavior
969    ///
970    ///
971    ///
972    /// At this time there are certain situations in which the `Future` returned
973    /// by the `AsyncFnOnce` passed to this function will not be polled for an
974    /// extended period of time, despite one or more `Waker::wake` events having
975    /// occurred for the task to which it belongs.  This can manifest as the
976    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
977    /// the `Store` being held by e.g. a blocking host function, preventing the
978    /// `Future` from being polled. A canonical example of this is when the
979    /// `fun` provided to this function attempts to set a timeout for an
980    /// invocation of a wasm function. In this situation the async closure is
981    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
982    /// to elapse. At this time this setup will not always work and the timeout
983    /// may not reliably fire.
984    ///
985    /// This function will not block the current thread and as such is always
986    /// suitable to run in an `async` context, but the current implementation of
987    /// Wasmtime can lead to situations where a certain wasm computation is
988    /// required to make progress the closure to make progress. This is an
989    /// artifact of Wasmtime's historical implementation of `async` functions
990    /// and is the topic of [#11869] and [#11870]. In the timeout example from
991    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
992    /// progress for a readiness notification of (b) to get delivered.
993    ///
994    /// This effectively means that it's not possible to reliably perform a
995    /// "select" operation within the `fun` closure, which timeouts for example
996    /// are based on. Fixing this requires some relatively major refactoring
997    /// work within Wasmtime itself. This is a known pitfall otherwise and one
998    /// that is intended to be fixed one day. In the meantime it's recommended
999    /// to apply timeouts or such to the entire `run_concurrent` call itself
1000    /// rather than internally.
1001    ///
1002    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1003    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1004    ///
1005    /// # Example
1006    ///
1007    /// ```
1008    /// # use {
1009    /// #   anyhow::{Result},
1010    /// #   wasmtime::{
1011    /// #     component::{ Component, Linker, Resource, ResourceTable},
1012    /// #     Config, Engine, Store
1013    /// #   },
1014    /// # };
1015    /// #
1016    /// # struct MyResource(u32);
1017    /// # struct Ctx { table: ResourceTable }
1018    /// #
1019    /// # async fn foo() -> Result<()> {
1020    /// # let mut config = Config::new();
1021    /// # let engine = Engine::new(&config)?;
1022    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1023    /// # let mut linker = Linker::new(&engine);
1024    /// # let component = Component::new(&engine, "")?;
1025    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1026    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1027    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1028    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1029    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1030    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1031    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1032    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1033    ///    Ok(())
1034    /// }).await??;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1039    where
1040        T: Send + 'static,
1041    {
1042        self.do_run_concurrent(fun, false).await
1043    }
1044
1045    pub(super) async fn run_concurrent_trap_on_idle<R>(
1046        self,
1047        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1048    ) -> Result<R>
1049    where
1050        T: Send + 'static,
1051    {
1052        self.do_run_concurrent(fun, true).await
1053    }
1054
1055    async fn do_run_concurrent<R>(
1056        mut self,
1057        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1058        trap_on_idle: bool,
1059    ) -> Result<R>
1060    where
1061        T: Send + 'static,
1062    {
1063        check_recursive_run();
1064        let token = StoreToken::new(self.as_context_mut());
1065
1066        struct Dropper<'a, T: 'static, V> {
1067            store: StoreContextMut<'a, T>,
1068            value: ManuallyDrop<V>,
1069        }
1070
1071        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1072            fn drop(&mut self) {
1073                tls::set(self.store.0, || {
1074                    // SAFETY: Here we drop the value without moving it for the
1075                    // first and only time -- per the contract for `Drop::drop`,
1076                    // this code won't run again, and the `value` field will no
1077                    // longer be accessible.
1078                    unsafe { ManuallyDrop::drop(&mut self.value) }
1079                });
1080            }
1081        }
1082
1083        let accessor = &Accessor::new(token);
1084        let dropper = &mut Dropper {
1085            store: self,
1086            value: ManuallyDrop::new(fun(accessor)),
1087        };
1088        // SAFETY: We never move `dropper` nor its `value` field.
1089        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1090
1091        dropper
1092            .store
1093            .as_context_mut()
1094            .poll_until(future, trap_on_idle)
1095            .await
1096    }
1097
1098    /// Run this store's event loop.
1099    ///
1100    /// The returned future will resolve when the specified future completes or,
1101    /// if `trap_on_idle` is true, when the event loop can't make further
1102    /// progress.
1103    async fn poll_until<R>(
1104        mut self,
1105        mut future: Pin<&mut impl Future<Output = R>>,
1106        trap_on_idle: bool,
1107    ) -> Result<R>
1108    where
1109        T: Send + 'static,
1110    {
1111        struct Reset<'a, T: 'static> {
1112            store: StoreContextMut<'a, T>,
1113            futures: Option<FuturesUnordered<HostTaskFuture>>,
1114        }
1115
1116        impl<'a, T> Drop for Reset<'a, T> {
1117            fn drop(&mut self) {
1118                if let Some(futures) = self.futures.take() {
1119                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1120                }
1121            }
1122        }
1123
1124        loop {
1125            // Take `ConcurrentState::futures` out of the store so we can poll
1126            // it while also safely giving any of the futures inside access to
1127            // `self`.
1128            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1129            let mut reset = Reset {
1130                store: self.as_context_mut(),
1131                futures,
1132            };
1133            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1134
1135            let result = future::poll_fn(|cx| {
1136                // First, poll the future we were passed as an argument and
1137                // return immediately if it's ready.
1138                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1139                    return Poll::Ready(Ok(Either::Left(value)));
1140                }
1141
1142                // Next, poll `ConcurrentState::futures` (which includes any
1143                // pending host tasks and/or background tasks), returning
1144                // immediately if one of them fails.
1145                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1146                    Poll::Ready(Some(output)) => {
1147                        match output {
1148                            Err(e) => return Poll::Ready(Err(e)),
1149                            Ok(()) => {}
1150                        }
1151                        Poll::Ready(true)
1152                    }
1153                    Poll::Ready(None) => Poll::Ready(false),
1154                    Poll::Pending => Poll::Pending,
1155                };
1156
1157                // Next, check the "high priority" work queue and return
1158                // immediately if it has at least one item.
1159                let state = reset.store.0.concurrent_state_mut();
1160                let ready = mem::take(&mut state.high_priority);
1161                let ready = if ready.is_empty() {
1162                    // Next, check the "low priority" work queue and return
1163                    // immediately if it has at least one item.
1164                    let ready = mem::take(&mut state.low_priority);
1165                    if ready.is_empty() {
1166                        return match next {
1167                            Poll::Ready(true) => {
1168                                // In this case, one of the futures in
1169                                // `ConcurrentState::futures` completed
1170                                // successfully, so we return now and continue
1171                                // the outer loop in case there is another one
1172                                // ready to complete.
1173                                Poll::Ready(Ok(Either::Right(Vec::new())))
1174                            }
1175                            Poll::Ready(false) => {
1176                                // Poll the future we were passed one last time
1177                                // in case one of `ConcurrentState::futures` had
1178                                // the side effect of unblocking it.
1179                                if let Poll::Ready(value) =
1180                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1181                                {
1182                                    Poll::Ready(Ok(Either::Left(value)))
1183                                } else {
1184                                    // In this case, there are no more pending
1185                                    // futures in `ConcurrentState::futures`,
1186                                    // there are no remaining work items, _and_
1187                                    // the future we were passed as an argument
1188                                    // still hasn't completed.
1189                                    if trap_on_idle {
1190                                        // `trap_on_idle` is true, so we exit
1191                                        // immediately.
1192                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1193                                    } else {
1194                                        // `trap_on_idle` is false, so we assume
1195                                        // that future will wake up and give us
1196                                        // more work to do when it's ready to.
1197                                        Poll::Pending
1198                                    }
1199                                }
1200                            }
1201                            // There is at least one pending future in
1202                            // `ConcurrentState::futures` and we have nothing
1203                            // else to do but wait for now, so we return
1204                            // `Pending`.
1205                            Poll::Pending => Poll::Pending,
1206                        };
1207                    } else {
1208                        ready
1209                    }
1210                } else {
1211                    ready
1212                };
1213
1214                Poll::Ready(Ok(Either::Right(ready)))
1215            })
1216            .await;
1217
1218            // Put the `ConcurrentState::futures` back into the store before we
1219            // return or handle any work items since one or more of those items
1220            // might append more futures.
1221            drop(reset);
1222
1223            match result? {
1224                // The future we were passed as an argument completed, so we
1225                // return the result.
1226                Either::Left(value) => break Ok(value),
1227                // The future we were passed has not yet completed, so handle
1228                // any work items and then loop again.
1229                Either::Right(ready) => {
1230                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1231                        store: StoreContextMut<'a, T>,
1232                        ready: I,
1233                    }
1234
1235                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1236                        fn drop(&mut self) {
1237                            while let Some(item) = self.ready.next() {
1238                                match item {
1239                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1240                                    WorkItem::PushFuture(future) => {
1241                                        tls::set(self.store.0, move || drop(future))
1242                                    }
1243                                    _ => {}
1244                                }
1245                            }
1246                        }
1247                    }
1248
1249                    let mut dispose = Dispose {
1250                        store: self.as_context_mut(),
1251                        ready: ready.into_iter(),
1252                    };
1253
1254                    while let Some(item) = dispose.ready.next() {
1255                        dispose
1256                            .store
1257                            .as_context_mut()
1258                            .handle_work_item(item)
1259                            .await?;
1260                    }
1261                }
1262            }
1263        }
1264    }
1265
1266    /// Handle the specified work item, possibly resuming a fiber if applicable.
1267    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1268    where
1269        T: Send,
1270    {
1271        log::trace!("handle work item {item:?}");
1272        match item {
1273            WorkItem::PushFuture(future) => {
1274                self.0
1275                    .concurrent_state_mut()
1276                    .futures
1277                    .get_mut()
1278                    .as_mut()
1279                    .unwrap()
1280                    .push(future.into_inner());
1281            }
1282            WorkItem::ResumeFiber(fiber) => {
1283                self.0.resume_fiber(fiber).await?;
1284            }
1285            WorkItem::GuestCall(call) => {
1286                let state = self.0.concurrent_state_mut();
1287                if call.is_ready(state)? {
1288                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1289                } else {
1290                    let task = state.get_mut(call.thread.task)?;
1291                    if !task.starting_sent {
1292                        task.starting_sent = true;
1293                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1294                            Waitable::Guest(call.thread.task).set_event(
1295                                state,
1296                                Some(Event::Subtask {
1297                                    status: Status::Starting,
1298                                }),
1299                            )?;
1300                        }
1301                    }
1302
1303                    let runtime_instance = state.get_mut(call.thread.task)?.instance;
1304                    state
1305                        .instance_state(runtime_instance)
1306                        .pending
1307                        .insert(call.thread, call.kind);
1308                }
1309            }
1310            WorkItem::Poll(params) => {
1311                let state = self.0.concurrent_state_mut();
1312                if state.get_mut(params.thread.task)?.event.is_some()
1313                    || !state.get_mut(params.set)?.ready.is_empty()
1314                {
1315                    // There's at least one event immediately available; deliver
1316                    // it to the guest ASAP.
1317                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1318                        thread: params.thread,
1319                        kind: GuestCallKind::DeliverEvent {
1320                            instance: params.instance,
1321                            set: Some(params.set),
1322                        },
1323                    }));
1324                } else {
1325                    // There are no events immediately available; deliver
1326                    // `Event::None` to the guest.
1327                    state.get_mut(params.thread.task)?.event = Some(Event::None);
1328                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1329                        thread: params.thread,
1330                        kind: GuestCallKind::DeliverEvent {
1331                            instance: params.instance,
1332                            set: Some(params.set),
1333                        },
1334                    }));
1335                }
1336            }
1337            WorkItem::WorkerFunction(fun) => {
1338                self.run_on_worker(WorkerItem::Function(fun)).await?;
1339            }
1340        }
1341
1342        Ok(())
1343    }
1344
1345    /// Execute the specified guest call on a worker fiber.
1346    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1347    where
1348        T: Send,
1349    {
1350        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1351            fiber
1352        } else {
1353            fiber::make_fiber(self.0, move |store| {
1354                loop {
1355                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1356                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1357                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1358                    }
1359
1360                    store.suspend(SuspendReason::NeedWork)?;
1361                }
1362            })?
1363        };
1364
1365        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1366        assert!(worker_item.is_none());
1367        *worker_item = Some(item);
1368
1369        self.0.resume_fiber(worker).await
1370    }
1371
1372    /// Wrap the specified host function in a future which will call it, passing
1373    /// it an `&Accessor<T>`.
1374    ///
1375    /// See the `Accessor` documentation for details.
1376    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1377    where
1378        T: 'static,
1379        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1380            + Send
1381            + Sync
1382            + 'static,
1383        R: Send + Sync + 'static,
1384    {
1385        let token = StoreToken::new(self);
1386        async move {
1387            let mut accessor = Accessor::new(token);
1388            closure(&mut accessor).await
1389        }
1390    }
1391}
1392
1393impl StoreOpaque {
1394    /// Resume the specified fiber, giving it exclusive access to the specified
1395    /// store.
1396    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1397        let old_thread = self.concurrent_state_mut().guest_thread;
1398        log::trace!("resume_fiber: save current thread {old_thread:?}");
1399
1400        let fiber = fiber::resolve_or_release(self, fiber).await?;
1401
1402        let state = self.concurrent_state_mut();
1403
1404        state.guest_thread = old_thread;
1405        if let Some(ref ot) = old_thread {
1406            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1407        }
1408        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1409
1410        if let Some(mut fiber) = fiber {
1411            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1412            // See the `SuspendReason` documentation for what each case means.
1413            match state.suspend_reason.take().unwrap() {
1414                SuspendReason::NeedWork => {
1415                    if state.worker.is_none() {
1416                        state.worker = Some(fiber);
1417                    } else {
1418                        fiber.dispose(self);
1419                    }
1420                }
1421                SuspendReason::Yielding { thread, .. } => {
1422                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1423                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1424                }
1425                SuspendReason::ExplicitlySuspending { thread, .. } => {
1426                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1427                }
1428                SuspendReason::Waiting { set, thread } => {
1429                    let old = state
1430                        .get_mut(set)?
1431                        .waiting
1432                        .insert(thread, WaitMode::Fiber(fiber));
1433                    assert!(old.is_none());
1434                }
1435            };
1436        } else {
1437            log::trace!("resume_fiber: fiber has exited");
1438        }
1439
1440        Ok(())
1441    }
1442
1443    /// Suspend the current fiber, storing the reason in
1444    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1445    /// it should be resumed.
1446    ///
1447    /// See the `SuspendReason` documentation for details.
1448    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1449        log::trace!("suspend fiber: {reason:?}");
1450
1451        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1452        // pop the call context which manages resource borrows before suspending
1453        // and then push it again once we've resumed.
1454        let task = match &reason {
1455            SuspendReason::Yielding { thread, .. }
1456            | SuspendReason::Waiting { thread, .. }
1457            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1458            SuspendReason::NeedWork => None,
1459        };
1460
1461        let old_guest_thread = if let Some(task) = task {
1462            self.maybe_pop_call_context(task)?;
1463            self.concurrent_state_mut().guest_thread
1464        } else {
1465            None
1466        };
1467
1468        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1469        assert!(suspend_reason.is_none());
1470        *suspend_reason = Some(reason);
1471
1472        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1473
1474        if let Some(task) = task {
1475            self.concurrent_state_mut().guest_thread = old_guest_thread;
1476            self.maybe_push_call_context(task)?;
1477        }
1478
1479        Ok(())
1480    }
1481
1482    /// Push the call context for managing resource borrows for the specified
1483    /// guest task if it has not yet either returned a result or cancelled
1484    /// itself.
1485    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1486        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1487
1488        if !task.returned_or_cancelled() {
1489            log::trace!("push call context for {guest_task:?}");
1490            let call_context = task.call_context.take().unwrap();
1491            self.component_resource_state().0.push(call_context);
1492        }
1493        Ok(())
1494    }
1495
1496    /// Pop the call context for managing resource borrows for the specified
1497    /// guest task if it has not yet either returned a result or cancelled
1498    /// itself.
1499    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1500        if !self
1501            .concurrent_state_mut()
1502            .get_mut(guest_task)?
1503            .returned_or_cancelled()
1504        {
1505            log::trace!("pop call context for {guest_task:?}");
1506            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1507            self.concurrent_state_mut()
1508                .get_mut(guest_task)?
1509                .call_context = call_context;
1510        }
1511        Ok(())
1512    }
1513
1514    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1515        let state = self.concurrent_state_mut();
1516        let caller = state.guest_thread.unwrap();
1517        let old_set = waitable.common(state)?.set;
1518        let set = state.get_mut(caller.task)?.sync_call_set;
1519        waitable.join(state, Some(set))?;
1520        self.suspend(SuspendReason::Waiting {
1521            set,
1522            thread: caller,
1523        })?;
1524        let state = self.concurrent_state_mut();
1525        waitable.join(state, old_set)
1526    }
1527}
1528
1529impl Instance {
1530    /// Get the next pending event for the specified task and (optional)
1531    /// waitable set, along with the waitable handle if applicable.
1532    fn get_event(
1533        self,
1534        store: &mut StoreOpaque,
1535        guest_task: TableId<GuestTask>,
1536        set: Option<TableId<WaitableSet>>,
1537        cancellable: bool,
1538    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1539        let state = store.concurrent_state_mut();
1540
1541        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1542            log::trace!("deliver event {event:?} to {guest_task:?}");
1543
1544            if cancellable || !matches!(event, Event::Cancelled) {
1545                return Ok(Some((event, None)));
1546            } else {
1547                state.get_mut(guest_task)?.event = Some(event);
1548            }
1549        }
1550
1551        Ok(
1552            if let Some((set, waitable)) = set
1553                .and_then(|set| {
1554                    state
1555                        .get_mut(set)
1556                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1557                        .transpose()
1558                })
1559                .transpose()?
1560            {
1561                let common = waitable.common(state)?;
1562                let handle = common.handle.unwrap();
1563                let event = common.event.take().unwrap();
1564
1565                log::trace!(
1566                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1567                );
1568
1569                waitable.on_delivery(store, self, event);
1570
1571                Some((event, Some((waitable, handle))))
1572            } else {
1573                None
1574            },
1575        )
1576    }
1577
1578    /// Handle the `CallbackCode` returned from an async-lifted export or its
1579    /// callback.
1580    fn handle_callback_code(
1581        self,
1582        store: &mut StoreOpaque,
1583        guest_thread: QualifiedThreadId,
1584        runtime_instance: RuntimeComponentInstanceIndex,
1585        code: u32,
1586    ) -> Result<()> {
1587        let (code, set) = unpack_callback_code(code);
1588
1589        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1590
1591        let state = store.concurrent_state_mut();
1592
1593        let get_set = |store, handle| {
1594            if handle == 0 {
1595                bail!("invalid waitable-set handle");
1596            }
1597
1598            let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1599                .waitable_set_rep(handle)?;
1600
1601            Ok(TableId::<WaitableSet>::new(set))
1602        };
1603
1604        match code {
1605            callback_code::EXIT => {
1606                log::trace!("implicit thread {guest_thread:?} completed");
1607                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1608                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1609                if task.threads.is_empty() && !task.returned_or_cancelled() {
1610                    bail!(Trap::NoAsyncResult);
1611                }
1612                match &task.caller {
1613                    Caller::Host { .. } => {
1614                        if task.ready_to_delete() {
1615                            Waitable::Guest(guest_thread.task)
1616                                .delete_from(store.concurrent_state_mut())?;
1617                        }
1618                    }
1619                    Caller::Guest { .. } => {
1620                        task.exited = true;
1621                        task.callback = None;
1622                    }
1623                }
1624            }
1625            callback_code::YIELD => {
1626                // Push this thread onto the "low priority" queue so it runs after
1627                // any other threads have had a chance to run.
1628                let task = state.get_mut(guest_thread.task)?;
1629                assert!(task.event.is_none());
1630                task.event = Some(Event::None);
1631                state.push_low_priority(WorkItem::GuestCall(GuestCall {
1632                    thread: guest_thread,
1633                    kind: GuestCallKind::DeliverEvent {
1634                        instance: self,
1635                        set: None,
1636                    },
1637                }));
1638            }
1639            callback_code::WAIT | callback_code::POLL => {
1640                let set = get_set(store, set)?;
1641                let state = store.concurrent_state_mut();
1642
1643                if state.get_mut(guest_thread.task)?.event.is_some()
1644                    || !state.get_mut(set)?.ready.is_empty()
1645                {
1646                    // An event is immediately available; deliver it ASAP.
1647                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1648                        thread: guest_thread,
1649                        kind: GuestCallKind::DeliverEvent {
1650                            instance: self,
1651                            set: Some(set),
1652                        },
1653                    }));
1654                } else {
1655                    // No event is immediately available.
1656                    match code {
1657                        callback_code::POLL => {
1658                            // We're polling, so just yield and check whether an
1659                            // event has arrived after that.
1660                            state.push_low_priority(WorkItem::Poll(PollParams {
1661                                instance: self,
1662                                thread: guest_thread,
1663                                set,
1664                            }));
1665                        }
1666                        callback_code::WAIT => {
1667                            // We're waiting, so register to be woken up when an
1668                            // event is published for this waitable set.
1669                            //
1670                            // Here we also set `GuestTask::wake_on_cancel`
1671                            // which allows `subtask.cancel` to interrupt the
1672                            // wait.
1673                            let old = state
1674                                .get_mut(guest_thread.thread)?
1675                                .wake_on_cancel
1676                                .replace(set);
1677                            assert!(old.is_none());
1678                            let old = state
1679                                .get_mut(set)?
1680                                .waiting
1681                                .insert(guest_thread, WaitMode::Callback(self));
1682                            assert!(old.is_none());
1683                        }
1684                        _ => unreachable!(),
1685                    }
1686                }
1687            }
1688            _ => bail!("unsupported callback code: {code}"),
1689        }
1690
1691        Ok(())
1692    }
1693
1694    fn cleanup_thread(
1695        self,
1696        store: &mut StoreOpaque,
1697        guest_thread: QualifiedThreadId,
1698        runtime_instance: RuntimeComponentInstanceIndex,
1699    ) -> Result<()> {
1700        let guest_id = store
1701            .concurrent_state_mut()
1702            .get_mut(guest_thread.thread)?
1703            .instance_rep;
1704        self.id().get_mut(store).guest_tables().0[runtime_instance]
1705            .guest_thread_remove(guest_id.unwrap())?;
1706
1707        store.concurrent_state_mut().delete(guest_thread.thread)?;
1708        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1709        task.threads.remove(&guest_thread.thread);
1710        Ok(())
1711    }
1712
1713    /// Add the specified guest call to the "high priority" work item queue, to
1714    /// be started as soon as backpressure and/or reentrance rules allow.
1715    ///
1716    /// SAFETY: The raw pointer arguments must be valid references to guest
1717    /// functions (with the appropriate signatures) when the closures queued by
1718    /// this function are called.
1719    unsafe fn queue_call<T: 'static>(
1720        self,
1721        mut store: StoreContextMut<T>,
1722        guest_thread: QualifiedThreadId,
1723        callee: SendSyncPtr<VMFuncRef>,
1724        param_count: usize,
1725        result_count: usize,
1726        flags: Option<InstanceFlags>,
1727        async_: bool,
1728        callback: Option<SendSyncPtr<VMFuncRef>>,
1729        post_return: Option<SendSyncPtr<VMFuncRef>>,
1730    ) -> Result<()> {
1731        /// Return a closure which will call the specified function in the scope
1732        /// of the specified task.
1733        ///
1734        /// This will use `GuestTask::lower_params` to lower the parameters, but
1735        /// will not lift the result; instead, it returns a
1736        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1737        /// any, may be lifted.  Note that an async-lifted export will have
1738        /// returned its result using the `task.return` intrinsic (or not
1739        /// returned a result at all, in the case of `task.cancel`), in which
1740        /// case the "result" of this call will either be a callback code or
1741        /// nothing.
1742        ///
1743        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1744        /// the returned closure is called.
1745        unsafe fn make_call<T: 'static>(
1746            store: StoreContextMut<T>,
1747            guest_thread: QualifiedThreadId,
1748            callee: SendSyncPtr<VMFuncRef>,
1749            param_count: usize,
1750            result_count: usize,
1751            flags: Option<InstanceFlags>,
1752        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1753        + Send
1754        + Sync
1755        + 'static
1756        + use<T> {
1757            let token = StoreToken::new(store);
1758            move |store: &mut dyn VMStore| {
1759                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1760
1761                store
1762                    .concurrent_state_mut()
1763                    .get_mut(guest_thread.thread)?
1764                    .state = GuestThreadState::Running;
1765                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1766                let may_enter_after_call = task.call_post_return_automatically();
1767                let lower = task.lower_params.take().unwrap();
1768
1769                lower(store, &mut storage[..param_count])?;
1770
1771                let mut store = token.as_context_mut(store);
1772
1773                // SAFETY: Per the contract documented in `make_call's`
1774                // documentation, `callee` must be a valid pointer.
1775                unsafe {
1776                    if let Some(mut flags) = flags {
1777                        flags.set_may_enter(false);
1778                    }
1779                    crate::Func::call_unchecked_raw(
1780                        &mut store,
1781                        callee.as_non_null(),
1782                        NonNull::new(
1783                            &mut storage[..param_count.max(result_count)]
1784                                as *mut [MaybeUninit<ValRaw>] as _,
1785                        )
1786                        .unwrap(),
1787                    )?;
1788                    if let Some(mut flags) = flags {
1789                        flags.set_may_enter(may_enter_after_call);
1790                    }
1791                }
1792
1793                Ok(storage)
1794            }
1795        }
1796
1797        // SAFETY: Per the contract described in this function documentation,
1798        // the `callee` pointer which `call` closes over must be valid when
1799        // called by the closure we queue below.
1800        let call = unsafe {
1801            make_call(
1802                store.as_context_mut(),
1803                guest_thread,
1804                callee,
1805                param_count,
1806                result_count,
1807                flags,
1808            )
1809        };
1810
1811        let callee_instance = store
1812            .0
1813            .concurrent_state_mut()
1814            .get_mut(guest_thread.task)?
1815            .instance;
1816        let fun = if callback.is_some() {
1817            assert!(async_);
1818
1819            Box::new(move |store: &mut dyn VMStore| {
1820                self.add_guest_thread_to_instance_table(
1821                    guest_thread.thread,
1822                    store,
1823                    callee_instance,
1824                )?;
1825                let old_thread = store
1826                    .concurrent_state_mut()
1827                    .guest_thread
1828                    .replace(guest_thread);
1829                log::trace!(
1830                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1831                );
1832
1833                store.maybe_push_call_context(guest_thread.task)?;
1834
1835                store.concurrent_state_mut().enter_instance(callee_instance);
1836
1837                // SAFETY: See the documentation for `make_call` to review the
1838                // contract we must uphold for `call` here.
1839                //
1840                // Per the contract described in the `queue_call`
1841                // documentation, the `callee` pointer which `call` closes
1842                // over must be valid.
1843                let storage = call(store)?;
1844
1845                store
1846                    .concurrent_state_mut()
1847                    .exit_instance(callee_instance)?;
1848
1849                store.maybe_pop_call_context(guest_thread.task)?;
1850
1851                let state = store.concurrent_state_mut();
1852                state.guest_thread = old_thread;
1853                old_thread
1854                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1855                log::trace!("stackless call: restored {old_thread:?} as current thread");
1856
1857                // SAFETY: `wasmparser` will have validated that the callback
1858                // function returns a `i32` result.
1859                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1860
1861                self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1862
1863                Ok(())
1864            }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1865        } else {
1866            let token = StoreToken::new(store.as_context_mut());
1867            Box::new(move |store: &mut dyn VMStore| {
1868                self.add_guest_thread_to_instance_table(
1869                    guest_thread.thread,
1870                    store,
1871                    callee_instance,
1872                )?;
1873                let old_thread = store
1874                    .concurrent_state_mut()
1875                    .guest_thread
1876                    .replace(guest_thread);
1877                log::trace!(
1878                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1879                );
1880                let mut flags = self.id().get(store).instance_flags(callee_instance);
1881
1882                store.maybe_push_call_context(guest_thread.task)?;
1883
1884                // Unless this is a callback-less (i.e. stackful)
1885                // async-lifted export, we need to record that the instance
1886                // cannot be entered until the call returns.
1887                if !async_ {
1888                    store.concurrent_state_mut().enter_instance(callee_instance);
1889                }
1890
1891                // SAFETY: See the documentation for `make_call` to review the
1892                // contract we must uphold for `call` here.
1893                //
1894                // Per the contract described in the `queue_call`
1895                // documentation, the `callee` pointer which `call` closes
1896                // over must be valid.
1897                let storage = call(store)?;
1898
1899                // This is a callback-less call, so the implicit thread has now completed
1900                self.cleanup_thread(store, guest_thread, callee_instance)?;
1901
1902                if async_ {
1903                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1904                    if task.threads.is_empty() && !task.returned_or_cancelled() {
1905                        bail!(Trap::NoAsyncResult);
1906                    }
1907                } else {
1908                    // This is a sync-lifted export, so now is when we lift the
1909                    // result, optionally call the post-return function, if any,
1910                    // and finally notify any current or future waiters that the
1911                    // subtask has returned.
1912
1913                    let lift = {
1914                        let state = store.concurrent_state_mut();
1915                        state.exit_instance(callee_instance)?;
1916
1917                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
1918
1919                        state
1920                            .get_mut(guest_thread.task)?
1921                            .lift_result
1922                            .take()
1923                            .unwrap()
1924                    };
1925
1926                    // SAFETY: `result_count` represents the number of core Wasm
1927                    // results returned, per `wasmparser`.
1928                    let result = (lift.lift)(store, unsafe {
1929                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1930                            &storage[..result_count],
1931                        )
1932                    })?;
1933
1934                    let post_return_arg = match result_count {
1935                        0 => ValRaw::i32(0),
1936                        // SAFETY: `result_count` represents the number of
1937                        // core Wasm results returned, per `wasmparser`.
1938                        1 => unsafe { storage[0].assume_init() },
1939                        _ => unreachable!(),
1940                    };
1941
1942                    if store
1943                        .concurrent_state_mut()
1944                        .get_mut(guest_thread.task)?
1945                        .call_post_return_automatically()
1946                    {
1947                        unsafe {
1948                            flags.set_may_leave(false);
1949                            flags.set_needs_post_return(false);
1950                        }
1951
1952                        if let Some(func) = post_return {
1953                            let mut store = token.as_context_mut(store);
1954
1955                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1956                            // either `wasmtime-cranelift`-generated fused adapter
1957                            // code or `component::Options`.  Per `wasmparser`
1958                            // post-return signature validation, we know it takes a
1959                            // single parameter.
1960                            unsafe {
1961                                crate::Func::call_unchecked_raw(
1962                                    &mut store,
1963                                    func.as_non_null(),
1964                                    slice::from_ref(&post_return_arg).into(),
1965                                )?;
1966                            }
1967                        }
1968
1969                        unsafe {
1970                            flags.set_may_leave(true);
1971                            flags.set_may_enter(true);
1972                        }
1973                    }
1974
1975                    self.task_complete(
1976                        store,
1977                        guest_thread.task,
1978                        result,
1979                        Status::Returned,
1980                        post_return_arg,
1981                    )?;
1982                }
1983
1984                store.maybe_pop_call_context(guest_thread.task)?;
1985
1986                let state = store.concurrent_state_mut();
1987                let task = state.get_mut(guest_thread.task)?;
1988
1989                match &task.caller {
1990                    Caller::Host { .. } => {
1991                        if task.ready_to_delete() {
1992                            Waitable::Guest(guest_thread.task).delete_from(state)?;
1993                        }
1994                    }
1995                    Caller::Guest { .. } => {
1996                        task.exited = true;
1997                    }
1998                }
1999
2000                Ok(())
2001            })
2002        };
2003
2004        store
2005            .0
2006            .concurrent_state_mut()
2007            .push_high_priority(WorkItem::GuestCall(GuestCall {
2008                thread: guest_thread,
2009                kind: GuestCallKind::StartImplicit(fun),
2010            }));
2011
2012        Ok(())
2013    }
2014
2015    /// Prepare (but do not start) a guest->guest call.
2016    ///
2017    /// This is called from fused adapter code generated in
2018    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2019    /// are synthesized Wasm functions which move the parameters from the caller
2020    /// to the callee and the result from the callee to the caller,
2021    /// respectively.  The adapter will call `Self::start_call` immediately
2022    /// after calling this function.
2023    ///
2024    /// SAFETY: All the pointer arguments must be valid pointers to guest
2025    /// entities (and with the expected signatures for the function references
2026    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2027    unsafe fn prepare_call<T: 'static>(
2028        self,
2029        mut store: StoreContextMut<T>,
2030        start: *mut VMFuncRef,
2031        return_: *mut VMFuncRef,
2032        caller_instance: RuntimeComponentInstanceIndex,
2033        callee_instance: RuntimeComponentInstanceIndex,
2034        task_return_type: TypeTupleIndex,
2035        memory: *mut VMMemoryDefinition,
2036        string_encoding: u8,
2037        caller_info: CallerInfo,
2038    ) -> Result<()> {
2039        self.id().get(store.0).check_may_leave(caller_instance)?;
2040
2041        enum ResultInfo {
2042            Heap { results: u32 },
2043            Stack { result_count: u32 },
2044        }
2045
2046        let result_info = match &caller_info {
2047            CallerInfo::Async {
2048                has_result: true,
2049                params,
2050            } => ResultInfo::Heap {
2051                results: params.last().unwrap().get_u32(),
2052            },
2053            CallerInfo::Async {
2054                has_result: false, ..
2055            } => ResultInfo::Stack { result_count: 0 },
2056            CallerInfo::Sync {
2057                result_count,
2058                params,
2059            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2060                results: params.last().unwrap().get_u32(),
2061            },
2062            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2063                result_count: *result_count,
2064            },
2065        };
2066
2067        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2068
2069        // Create a new guest task for the call, closing over the `start` and
2070        // `return_` functions to lift the parameters and lower the result,
2071        // respectively.
2072        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2073        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2074        let token = StoreToken::new(store.as_context_mut());
2075        let state = store.0.concurrent_state_mut();
2076        let old_thread = state.guest_thread.take();
2077        let new_task = GuestTask::new(
2078            state,
2079            Box::new(move |store, dst| {
2080                let mut store = token.as_context_mut(store);
2081                assert!(dst.len() <= MAX_FLAT_PARAMS);
2082                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2083                let count = match caller_info {
2084                    // Async callers, if they have a result, use the last
2085                    // parameter as a return pointer so chop that off if
2086                    // relevant here.
2087                    CallerInfo::Async { params, has_result } => {
2088                        let params = &params[..params.len() - usize::from(has_result)];
2089                        for (param, src) in params.iter().zip(&mut src) {
2090                            src.write(*param);
2091                        }
2092                        params.len()
2093                    }
2094
2095                    // Sync callers forward everything directly.
2096                    CallerInfo::Sync { params, .. } => {
2097                        for (param, src) in params.iter().zip(&mut src) {
2098                            src.write(*param);
2099                        }
2100                        params.len()
2101                    }
2102                };
2103                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2104                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2105                // how it was constructed (see
2106                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2107                // for details) we know it takes count parameters and returns
2108                // `dst.len()` results.
2109                unsafe {
2110                    crate::Func::call_unchecked_raw(
2111                        &mut store,
2112                        start.as_non_null(),
2113                        NonNull::new(
2114                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2115                        )
2116                        .unwrap(),
2117                    )?;
2118                }
2119                dst.copy_from_slice(&src[..dst.len()]);
2120                let state = store.0.concurrent_state_mut();
2121                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2122                    state,
2123                    Some(Event::Subtask {
2124                        status: Status::Started,
2125                    }),
2126                )?;
2127                Ok(())
2128            }),
2129            LiftResult {
2130                lift: Box::new(move |store, src| {
2131                    // SAFETY: See comment in closure passed as `lower_params`
2132                    // parameter above.
2133                    let mut store = token.as_context_mut(store);
2134                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2135                    if let ResultInfo::Heap { results } = &result_info {
2136                        my_src.push(ValRaw::u32(*results));
2137                    }
2138                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2139                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2140                    // on how it was constructed (see
2141                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2142                    // for details) we know it takes `src.len()` parameters and
2143                    // returns up to 1 result.
2144                    unsafe {
2145                        crate::Func::call_unchecked_raw(
2146                            &mut store,
2147                            return_.as_non_null(),
2148                            my_src.as_mut_slice().into(),
2149                        )?;
2150                    }
2151                    let state = store.0.concurrent_state_mut();
2152                    let thread = state.guest_thread.unwrap();
2153                    if sync_caller {
2154                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2155                            if let ResultInfo::Stack { result_count } = &result_info {
2156                                match result_count {
2157                                    0 => None,
2158                                    1 => Some(my_src[0]),
2159                                    _ => unreachable!(),
2160                                }
2161                            } else {
2162                                None
2163                            },
2164                        );
2165                    }
2166                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2167                }),
2168                ty: task_return_type,
2169                memory: NonNull::new(memory).map(SendSyncPtr::new),
2170                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2171            },
2172            Caller::Guest {
2173                thread: old_thread.unwrap(),
2174                instance: caller_instance,
2175            },
2176            None,
2177            callee_instance,
2178        )?;
2179
2180        let guest_task = state.push(new_task)?;
2181        let new_thread = GuestThread::new_implicit(guest_task);
2182        let guest_thread = state.push(new_thread)?;
2183        state.get_mut(guest_task)?.threads.insert(guest_thread);
2184
2185        let state = store.0.concurrent_state_mut();
2186        if let Some(old_thread) = old_thread {
2187            if !state.may_enter(guest_task) {
2188                bail!(crate::Trap::CannotEnterComponent);
2189            }
2190
2191            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2192        };
2193
2194        // Make the new thread the current one so that `Self::start_call` knows
2195        // which one to start.
2196        state.guest_thread = Some(QualifiedThreadId {
2197            task: guest_task,
2198            thread: guest_thread,
2199        });
2200        log::trace!(
2201            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2202        );
2203
2204        Ok(())
2205    }
2206
2207    /// Call the specified callback function for an async-lifted export.
2208    ///
2209    /// SAFETY: `function` must be a valid reference to a guest function of the
2210    /// correct signature for a callback.
2211    unsafe fn call_callback<T>(
2212        self,
2213        mut store: StoreContextMut<T>,
2214        callee_instance: RuntimeComponentInstanceIndex,
2215        function: SendSyncPtr<VMFuncRef>,
2216        event: Event,
2217        handle: u32,
2218        may_enter_after_call: bool,
2219    ) -> Result<u32> {
2220        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2221
2222        let (ordinal, result) = event.parts();
2223        let params = &mut [
2224            ValRaw::u32(ordinal),
2225            ValRaw::u32(handle),
2226            ValRaw::u32(result),
2227        ];
2228        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2229        // `wasmtime-cranelift`-generated fused adapter code or
2230        // `component::Options`.  Per `wasmparser` callback signature
2231        // validation, we know it takes three parameters and returns one.
2232        unsafe {
2233            flags.set_may_enter(false);
2234            crate::Func::call_unchecked_raw(
2235                &mut store,
2236                function.as_non_null(),
2237                params.as_mut_slice().into(),
2238            )?;
2239            flags.set_may_enter(may_enter_after_call);
2240        }
2241        Ok(params[0].get_u32())
2242    }
2243
2244    /// Start a guest->guest call previously prepared using
2245    /// `Self::prepare_call`.
2246    ///
2247    /// This is called from fused adapter code generated in
2248    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2249    /// this function immediately after calling `Self::prepare_call`.
2250    ///
2251    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2252    /// functions with the appropriate signatures for the current guest task.
2253    /// If this is a call to an async-lowered import, the actual call may be
2254    /// deferred and run after this function returns, in which case the pointer
2255    /// arguments must also be valid when the call happens.
2256    unsafe fn start_call<T: 'static>(
2257        self,
2258        mut store: StoreContextMut<T>,
2259        callback: *mut VMFuncRef,
2260        post_return: *mut VMFuncRef,
2261        callee: *mut VMFuncRef,
2262        param_count: u32,
2263        result_count: u32,
2264        flags: u32,
2265        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2266    ) -> Result<u32> {
2267        let token = StoreToken::new(store.as_context_mut());
2268        let async_caller = storage.is_none();
2269        let state = store.0.concurrent_state_mut();
2270        let guest_thread = state.guest_thread.unwrap();
2271        let may_enter_after_call = state
2272            .get_mut(guest_thread.task)?
2273            .call_post_return_automatically();
2274        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2275        let param_count = usize::try_from(param_count).unwrap();
2276        assert!(param_count <= MAX_FLAT_PARAMS);
2277        let result_count = usize::try_from(result_count).unwrap();
2278        assert!(result_count <= MAX_FLAT_RESULTS);
2279
2280        let task = state.get_mut(guest_thread.task)?;
2281        if !callback.is_null() {
2282            // We're calling an async-lifted export with a callback, so store
2283            // the callback and related context as part of the task so we can
2284            // call it later when needed.
2285            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2286            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2287                let store = token.as_context_mut(store);
2288                unsafe {
2289                    self.call_callback::<T>(
2290                        store,
2291                        runtime_instance,
2292                        callback,
2293                        event,
2294                        handle,
2295                        may_enter_after_call,
2296                    )
2297                }
2298            }));
2299        }
2300
2301        let Caller::Guest {
2302            thread: caller,
2303            instance: runtime_instance,
2304        } = &task.caller
2305        else {
2306            // As of this writing, `start_call` is only used for guest->guest
2307            // calls.
2308            unreachable!()
2309        };
2310        let caller = *caller;
2311        let caller_instance = *runtime_instance;
2312
2313        let callee_instance = task.instance;
2314
2315        let instance_flags = if callback.is_null() {
2316            None
2317        } else {
2318            Some(self.id().get(store.0).instance_flags(callee_instance))
2319        };
2320
2321        // Queue the call as a "high priority" work item.
2322        unsafe {
2323            self.queue_call(
2324                store.as_context_mut(),
2325                guest_thread,
2326                callee,
2327                param_count,
2328                result_count,
2329                instance_flags,
2330                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2331                NonNull::new(callback).map(SendSyncPtr::new),
2332                NonNull::new(post_return).map(SendSyncPtr::new),
2333            )?;
2334        }
2335
2336        let state = store.0.concurrent_state_mut();
2337
2338        // Use the caller's `GuestTask::sync_call_set` to register interest in
2339        // the subtask...
2340        let guest_waitable = Waitable::Guest(guest_thread.task);
2341        let old_set = guest_waitable.common(state)?.set;
2342        let set = state.get_mut(caller.task)?.sync_call_set;
2343        guest_waitable.join(state, Some(set))?;
2344
2345        // ... and suspend this fiber temporarily while we wait for it to start.
2346        //
2347        // Note that we _could_ call the callee directly using the current fiber
2348        // rather than suspend this one, but that would make reasoning about the
2349        // event loop more complicated and is probably only worth doing if
2350        // there's a measurable performance benefit.  In addition, it would mean
2351        // blocking the caller if the callee calls a blocking sync-lowered
2352        // import, and as of this writing the spec says we must not do that.
2353        //
2354        // Alternatively, the fused adapter code could be modified to call the
2355        // callee directly without calling a host-provided intrinsic at all (in
2356        // which case it would need to do its own, inline backpressure checks,
2357        // etc.).  Again, we'd want to see a measurable performance benefit
2358        // before committing to such an optimization.  And again, we'd need to
2359        // update the spec to allow that.
2360        let (status, waitable) = loop {
2361            store.0.suspend(SuspendReason::Waiting {
2362                set,
2363                thread: caller,
2364            })?;
2365
2366            let state = store.0.concurrent_state_mut();
2367
2368            log::trace!("taking event for {:?}", guest_thread.task);
2369            let event = guest_waitable.take_event(state)?;
2370            let Some(Event::Subtask { status }) = event else {
2371                unreachable!();
2372            };
2373
2374            log::trace!("status {status:?} for {:?}", guest_thread.task);
2375
2376            if status == Status::Returned {
2377                // It returned, so we can stop waiting.
2378                break (status, None);
2379            } else if async_caller {
2380                // It hasn't returned yet, but the caller is calling via an
2381                // async-lowered import, so we generate a handle for the task
2382                // waitable and return the status.
2383                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2384                    .subtask_insert_guest(guest_thread.task.rep())?;
2385                store
2386                    .0
2387                    .concurrent_state_mut()
2388                    .get_mut(guest_thread.task)?
2389                    .common
2390                    .handle = Some(handle);
2391                break (status, Some(handle));
2392            } else {
2393                // The callee hasn't returned yet, and the caller is calling via
2394                // a sync-lowered import, so we loop and keep waiting until the
2395                // callee returns.
2396            }
2397        };
2398
2399        let state = store.0.concurrent_state_mut();
2400
2401        guest_waitable.join(state, old_set)?;
2402
2403        if let Some(storage) = storage {
2404            // The caller used a sync-lowered import to call an async-lifted
2405            // export, in which case the result, if any, has been stashed in
2406            // `GuestTask::sync_result`.
2407            let task = state.get_mut(guest_thread.task)?;
2408            if let Some(result) = task.sync_result.take() {
2409                if let Some(result) = result {
2410                    storage[0] = MaybeUninit::new(result);
2411                }
2412
2413                if task.exited {
2414                    if task.ready_to_delete() {
2415                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2416                    }
2417                }
2418            }
2419        }
2420
2421        // Reset the current thread to point to the caller as it resumes control.
2422        state.guest_thread = Some(caller);
2423        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2424        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2425
2426        Ok(status.pack(waitable))
2427    }
2428
2429    /// Poll the specified future once on behalf of a guest->host call using an
2430    /// async-lowered import.
2431    ///
2432    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2433    /// `Pending`, add it to the set of futures to be polled as part of this
2434    /// instance's event loop until it completes, and then return
2435    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2436    ///
2437    /// Whether the future returns `Ready` immediately or later, the `lower`
2438    /// function will be used to lower the result, if any, into the guest caller's
2439    /// stack and linear memory unless the task has been cancelled.
2440    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2441        self,
2442        mut store: StoreContextMut<T>,
2443        future: impl Future<Output = Result<R>> + Send + 'static,
2444        caller_instance: RuntimeComponentInstanceIndex,
2445        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2446    ) -> Result<Option<u32>> {
2447        let token = StoreToken::new(store.as_context_mut());
2448        let state = store.0.concurrent_state_mut();
2449        let caller = state.guest_thread.unwrap();
2450
2451        // Create an abortable future which hooks calls to poll and manages call
2452        // context state for the future.
2453        let (join_handle, future) = JoinHandle::run(async move {
2454            let mut future = pin!(future);
2455            let mut call_context = None;
2456            future::poll_fn(move |cx| {
2457                // Push the call context for managing any resource borrows
2458                // for the task.
2459                tls::get(|store| {
2460                    if let Some(call_context) = call_context.take() {
2461                        token
2462                            .as_context_mut(store)
2463                            .0
2464                            .component_resource_state()
2465                            .0
2466                            .push(call_context);
2467                    }
2468                });
2469
2470                let result = future.as_mut().poll(cx);
2471
2472                if result.is_pending() {
2473                    // Pop the call context for managing any resource
2474                    // borrows for the task.
2475                    tls::get(|store| {
2476                        call_context = Some(
2477                            token
2478                                .as_context_mut(store)
2479                                .0
2480                                .component_resource_state()
2481                                .0
2482                                .pop()
2483                                .unwrap(),
2484                        );
2485                    });
2486                }
2487                result
2488            })
2489            .await
2490        });
2491
2492        // We create a new host task even though it might complete immediately
2493        // (in which case we won't need to pass a waitable back to the guest).
2494        // If it does complete immediately, we'll remove it before we return.
2495        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2496
2497        log::trace!("new host task child of {caller:?}: {task:?}");
2498
2499        let mut future = Box::pin(future);
2500
2501        // Finally, poll the future.  We can use a dummy `Waker` here because
2502        // we'll add the future to `ConcurrentState::futures` and poll it
2503        // automatically from the event loop if it doesn't complete immediately
2504        // here.
2505        let poll = tls::set(store.0, || {
2506            future
2507                .as_mut()
2508                .poll(&mut Context::from_waker(&Waker::noop()))
2509        });
2510
2511        Ok(match poll {
2512            Poll::Ready(None) => unreachable!(),
2513            Poll::Ready(Some(result)) => {
2514                // It finished immediately; lower the result and delete the
2515                // task.
2516                lower(store.as_context_mut(), result?)?;
2517                log::trace!("delete host task {task:?} (already ready)");
2518                store.0.concurrent_state_mut().delete(task)?;
2519                None
2520            }
2521            Poll::Pending => {
2522                // It hasn't finished yet; add the future to
2523                // `ConcurrentState::futures` so it will be polled by the event
2524                // loop and allocate a waitable handle to return to the guest.
2525
2526                // Wrap the future in a closure responsible for lowering the result into
2527                // the guest's stack and memory, as well as notifying any waiters that
2528                // the task returned.
2529                let future =
2530                    Box::pin(async move {
2531                        let result = match future.await {
2532                            Some(result) => result?,
2533                            // Task was cancelled; nothing left to do.
2534                            None => return Ok(()),
2535                        };
2536                        tls::get(move |store| {
2537                            // Here we schedule a task to run on a worker fiber to do
2538                            // the lowering since it may involve a call to the guest's
2539                            // realloc function.  This is necessary because calling the
2540                            // guest while there are host embedder frames on the stack
2541                            // is unsound.
2542                            store.concurrent_state_mut().push_high_priority(
2543                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2544                                    lower(token.as_context_mut(store), result)?;
2545                                    let state = store.concurrent_state_mut();
2546                                    state.get_mut(task)?.join_handle.take();
2547                                    Waitable::Host(task).set_event(
2548                                        state,
2549                                        Some(Event::Subtask {
2550                                            status: Status::Returned,
2551                                        }),
2552                                    )
2553                                }))),
2554                            );
2555                            Ok(())
2556                        })
2557                    });
2558
2559                store.0.concurrent_state_mut().push_future(future);
2560                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2561                    .subtask_insert_host(task.rep())?;
2562                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2563                log::trace!(
2564                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2565                );
2566                Some(handle)
2567            }
2568        })
2569    }
2570
2571    /// Implements the `task.return` intrinsic, lifting the result for the
2572    /// current guest task.
2573    pub(crate) fn task_return(
2574        self,
2575        store: &mut dyn VMStore,
2576        caller: RuntimeComponentInstanceIndex,
2577        ty: TypeTupleIndex,
2578        options: OptionsIndex,
2579        storage: &[ValRaw],
2580    ) -> Result<()> {
2581        self.id().get(store).check_may_leave(caller)?;
2582        let state = store.concurrent_state_mut();
2583        let guest_thread = state.guest_thread.unwrap();
2584        let lift = state
2585            .get_mut(guest_thread.task)?
2586            .lift_result
2587            .take()
2588            .ok_or_else(|| {
2589                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2590            })?;
2591        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2592
2593        let CanonicalOptions {
2594            string_encoding,
2595            data_model,
2596            ..
2597        } = &self.id().get(store).component().env_component().options[options];
2598
2599        let invalid = ty != lift.ty
2600            || string_encoding != &lift.string_encoding
2601            || match data_model {
2602                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2603                    Some(memory) => {
2604                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2605                        let actual = self.id().get(store).runtime_memory(memory);
2606                        expected != actual.as_ptr()
2607                    }
2608                    // Memory not specified, meaning it didn't need to be
2609                    // specified per validation, so not invalid.
2610                    None => false,
2611                },
2612                // Always invalid as this isn't supported.
2613                CanonicalOptionsDataModel::Gc { .. } => true,
2614            };
2615
2616        if invalid {
2617            bail!("invalid `task.return` signature and/or options for current task");
2618        }
2619
2620        log::trace!("task.return for {guest_thread:?}");
2621
2622        let result = (lift.lift)(store, storage)?;
2623        self.task_complete(
2624            store,
2625            guest_thread.task,
2626            result,
2627            Status::Returned,
2628            ValRaw::i32(0),
2629        )
2630    }
2631
2632    /// Implements the `task.cancel` intrinsic.
2633    pub(crate) fn task_cancel(
2634        self,
2635        store: &mut StoreOpaque,
2636        caller: RuntimeComponentInstanceIndex,
2637    ) -> Result<()> {
2638        self.id().get(store).check_may_leave(caller)?;
2639        let state = store.concurrent_state_mut();
2640        let guest_thread = state.guest_thread.unwrap();
2641        let task = state.get_mut(guest_thread.task)?;
2642        if !task.cancel_sent {
2643            bail!("`task.cancel` called by task which has not been cancelled")
2644        }
2645        _ = task.lift_result.take().ok_or_else(|| {
2646            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2647        })?;
2648
2649        assert!(task.result.is_none());
2650
2651        log::trace!("task.cancel for {guest_thread:?}");
2652
2653        self.task_complete(
2654            store,
2655            guest_thread.task,
2656            Box::new(DummyResult),
2657            Status::ReturnCancelled,
2658            ValRaw::i32(0),
2659        )
2660    }
2661
2662    /// Complete the specified guest task (i.e. indicate that it has either
2663    /// returned a (possibly empty) result or cancelled itself).
2664    ///
2665    /// This will return any resource borrows and notify any current or future
2666    /// waiters that the task has completed.
2667    fn task_complete(
2668        self,
2669        store: &mut StoreOpaque,
2670        guest_task: TableId<GuestTask>,
2671        result: Box<dyn Any + Send + Sync>,
2672        status: Status,
2673        post_return_arg: ValRaw,
2674    ) -> Result<()> {
2675        if store
2676            .concurrent_state_mut()
2677            .get_mut(guest_task)?
2678            .call_post_return_automatically()
2679        {
2680            let (calls, host_table, _, instance) =
2681                store.component_resource_state_with_instance(self);
2682            ResourceTables {
2683                calls,
2684                host_table: Some(host_table),
2685                guest: Some(instance.guest_tables()),
2686            }
2687            .exit_call()?;
2688        } else {
2689            // As of this writing, the only scenario where `call_post_return_automatically`
2690            // would be false for a `GuestTask` is for host-to-guest calls using
2691            // `[Typed]Func::call_async`, in which case the `function_index`
2692            // should be a non-`None` value.
2693            let function_index = store
2694                .concurrent_state_mut()
2695                .get_mut(guest_task)?
2696                .function_index
2697                .unwrap();
2698            self.id()
2699                .get_mut(store)
2700                .post_return_arg_set(function_index, post_return_arg);
2701        }
2702
2703        let state = store.concurrent_state_mut();
2704        let task = state.get_mut(guest_task)?;
2705
2706        if let Caller::Host { tx, .. } = &mut task.caller {
2707            if let Some(tx) = tx.take() {
2708                _ = tx.send(result);
2709            }
2710        } else {
2711            task.result = Some(result);
2712            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2713        }
2714
2715        Ok(())
2716    }
2717
2718    /// Implements the `waitable-set.new` intrinsic.
2719    pub(crate) fn waitable_set_new(
2720        self,
2721        store: &mut StoreOpaque,
2722        caller_instance: RuntimeComponentInstanceIndex,
2723    ) -> Result<u32> {
2724        self.id().get_mut(store).check_may_leave(caller_instance)?;
2725        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2726        let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2727            .waitable_set_insert(set.rep())?;
2728        log::trace!("new waitable set {set:?} (handle {handle})");
2729        Ok(handle)
2730    }
2731
2732    /// Implements the `waitable-set.drop` intrinsic.
2733    pub(crate) fn waitable_set_drop(
2734        self,
2735        store: &mut StoreOpaque,
2736        caller_instance: RuntimeComponentInstanceIndex,
2737        set: u32,
2738    ) -> Result<()> {
2739        self.id().get_mut(store).check_may_leave(caller_instance)?;
2740        let rep =
2741            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2742
2743        log::trace!("drop waitable set {rep} (handle {set})");
2744
2745        let set = store
2746            .concurrent_state_mut()
2747            .delete(TableId::<WaitableSet>::new(rep))?;
2748
2749        if !set.waiting.is_empty() {
2750            bail!("cannot drop waitable set with waiters");
2751        }
2752
2753        Ok(())
2754    }
2755
2756    /// Implements the `waitable.join` intrinsic.
2757    pub(crate) fn waitable_join(
2758        self,
2759        store: &mut StoreOpaque,
2760        caller_instance: RuntimeComponentInstanceIndex,
2761        waitable_handle: u32,
2762        set_handle: u32,
2763    ) -> Result<()> {
2764        let mut instance = self.id().get_mut(store);
2765        instance.check_may_leave(caller_instance)?;
2766        let waitable =
2767            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2768
2769        let set = if set_handle == 0 {
2770            None
2771        } else {
2772            let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2773
2774            Some(TableId::<WaitableSet>::new(set))
2775        };
2776
2777        log::trace!(
2778            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2779        );
2780
2781        waitable.join(store.concurrent_state_mut(), set)
2782    }
2783
2784    /// Implements the `subtask.drop` intrinsic.
2785    pub(crate) fn subtask_drop(
2786        self,
2787        store: &mut StoreOpaque,
2788        caller_instance: RuntimeComponentInstanceIndex,
2789        task_id: u32,
2790    ) -> Result<()> {
2791        self.id().get_mut(store).check_may_leave(caller_instance)?;
2792        self.waitable_join(store, caller_instance, task_id, 0)?;
2793
2794        let (rep, is_host) =
2795            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2796
2797        let concurrent_state = store.concurrent_state_mut();
2798        let (waitable, expected_caller_instance, delete) = if is_host {
2799            let id = TableId::<HostTask>::new(rep);
2800            let task = concurrent_state.get_mut(id)?;
2801            if task.join_handle.is_some() {
2802                bail!("cannot drop a subtask which has not yet resolved");
2803            }
2804            (Waitable::Host(id), task.caller_instance, true)
2805        } else {
2806            let id = TableId::<GuestTask>::new(rep);
2807            let task = concurrent_state.get_mut(id)?;
2808            if task.lift_result.is_some() {
2809                bail!("cannot drop a subtask which has not yet resolved");
2810            }
2811            if let Caller::Guest { instance, .. } = &task.caller {
2812                (Waitable::Guest(id), *instance, task.exited)
2813            } else {
2814                unreachable!()
2815            }
2816        };
2817
2818        waitable.common(concurrent_state)?.handle = None;
2819
2820        if waitable.take_event(concurrent_state)?.is_some() {
2821            bail!("cannot drop a subtask with an undelivered event");
2822        }
2823
2824        if delete {
2825            waitable.delete_from(concurrent_state)?;
2826        }
2827
2828        // Since waitables can neither be passed between instances nor forged,
2829        // this should never fail unless there's a bug in Wasmtime, but we check
2830        // here to be sure:
2831        assert_eq!(expected_caller_instance, caller_instance);
2832        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2833        Ok(())
2834    }
2835
2836    /// Implements the `waitable-set.wait` intrinsic.
2837    pub(crate) fn waitable_set_wait(
2838        self,
2839        store: &mut StoreOpaque,
2840        caller: RuntimeComponentInstanceIndex,
2841        options: OptionsIndex,
2842        set: u32,
2843        payload: u32,
2844    ) -> Result<u32> {
2845        self.id().get(store).check_may_leave(caller)?;
2846        let &CanonicalOptions {
2847            cancellable,
2848            instance: caller_instance,
2849            ..
2850        } = &self.id().get(store).component().env_component().options[options];
2851        let rep =
2852            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2853
2854        self.waitable_check(
2855            store,
2856            cancellable,
2857            WaitableCheck::Wait(WaitableCheckParams {
2858                set: TableId::new(rep),
2859                options,
2860                payload,
2861            }),
2862        )
2863    }
2864
2865    /// Implements the `waitable-set.poll` intrinsic.
2866    pub(crate) fn waitable_set_poll(
2867        self,
2868        store: &mut StoreOpaque,
2869        caller: RuntimeComponentInstanceIndex,
2870        options: OptionsIndex,
2871        set: u32,
2872        payload: u32,
2873    ) -> Result<u32> {
2874        self.id().get(store).check_may_leave(caller)?;
2875        let &CanonicalOptions {
2876            cancellable,
2877            instance: caller_instance,
2878            ..
2879        } = &self.id().get(store).component().env_component().options[options];
2880        let rep =
2881            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2882
2883        self.waitable_check(
2884            store,
2885            cancellable,
2886            WaitableCheck::Poll(WaitableCheckParams {
2887                set: TableId::new(rep),
2888                options,
2889                payload,
2890            }),
2891        )
2892    }
2893
2894    /// Implements the `thread.index` intrinsic.
2895    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2896        let thread_id = store
2897            .concurrent_state_mut()
2898            .guest_thread
2899            .ok_or_else(|| anyhow!("no current thread"))?
2900            .thread;
2901        // The unwrap is safe because `instance_rep` must be `Some` by this point
2902        Ok(store
2903            .concurrent_state_mut()
2904            .get_mut(thread_id)?
2905            .instance_rep
2906            .unwrap())
2907    }
2908
2909    /// Implements the `thread.new_indirect` intrinsic.
2910    pub(crate) fn thread_new_indirect<T: 'static>(
2911        self,
2912        mut store: StoreContextMut<T>,
2913        runtime_instance: RuntimeComponentInstanceIndex,
2914        _func_ty_idx: TypeFuncIndex, // currently unused
2915        start_func_table_idx: RuntimeTableIndex,
2916        start_func_idx: u32,
2917        context: i32,
2918    ) -> Result<u32> {
2919        self.id().get(store.0).check_may_leave(runtime_instance)?;
2920
2921        log::trace!("creating new thread");
2922
2923        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2924        let instance = self.id().get_mut(store.0);
2925        let callee = instance
2926            .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2927            .ok_or_else(|| {
2928                anyhow!("the start function index points to an uninitialized function")
2929            })?;
2930        if callee.type_index(store.0) != start_func_ty.type_index() {
2931            bail!(
2932                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2933            );
2934        }
2935
2936        let token = StoreToken::new(store.as_context_mut());
2937        let start_func = Box::new(
2938            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2939                let old_thread = store
2940                    .concurrent_state_mut()
2941                    .guest_thread
2942                    .replace(guest_thread);
2943                log::trace!(
2944                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2945                );
2946
2947                store.maybe_push_call_context(guest_thread.task)?;
2948
2949                let mut store = token.as_context_mut(store);
2950                let mut params = [ValRaw::i32(context)];
2951                // Use call_unchecked rather than call or call_async, as we don't want to run the function
2952                // on a separate fiber if we're running in an async store.
2953                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2954
2955                store.0.maybe_pop_call_context(guest_thread.task)?;
2956
2957                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2958                log::trace!("explicit thread {guest_thread:?} completed");
2959                let state = store.0.concurrent_state_mut();
2960                let task = state.get_mut(guest_thread.task)?;
2961                if task.threads.is_empty() && !task.returned_or_cancelled() {
2962                    bail!(Trap::NoAsyncResult);
2963                }
2964                state.guest_thread = old_thread;
2965                old_thread
2966                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2967                if state.get_mut(guest_thread.task)?.ready_to_delete() {
2968                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2969                }
2970                log::trace!("thread start: restored {old_thread:?} as current thread");
2971
2972                Ok(())
2973            },
2974        );
2975
2976        let state = store.0.concurrent_state_mut();
2977        let current_thread = state.guest_thread.unwrap();
2978        let parent_task = current_thread.task;
2979
2980        let new_thread = GuestThread::new_explicit(parent_task, start_func);
2981        let thread_id = state.push(new_thread)?;
2982        state.get_mut(parent_task)?.threads.insert(thread_id);
2983
2984        log::trace!("new thread with id {thread_id:?} created");
2985
2986        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2987    }
2988
2989    pub(crate) fn resume_suspended_thread(
2990        self,
2991        store: &mut StoreOpaque,
2992        runtime_instance: RuntimeComponentInstanceIndex,
2993        thread_idx: u32,
2994        high_priority: bool,
2995    ) -> Result<()> {
2996        let thread_id =
2997            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
2998        let state = store.concurrent_state_mut();
2999        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3000        let thread = state.get_mut(guest_thread.thread)?;
3001
3002        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3003            GuestThreadState::NotStartedExplicit(start_func) => {
3004                log::trace!("starting thread {guest_thread:?}");
3005                let guest_call = WorkItem::GuestCall(GuestCall {
3006                    thread: guest_thread,
3007                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3008                        start_func(store, guest_thread)
3009                    })),
3010                });
3011                store
3012                    .concurrent_state_mut()
3013                    .push_work_item(guest_call, high_priority);
3014            }
3015            GuestThreadState::Suspended(fiber) => {
3016                log::trace!("resuming thread {thread_id:?} that was suspended");
3017                store
3018                    .concurrent_state_mut()
3019                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3020            }
3021            _ => {
3022                bail!("cannot resume thread which is not suspended");
3023            }
3024        }
3025        Ok(())
3026    }
3027
3028    fn add_guest_thread_to_instance_table(
3029        self,
3030        thread_id: TableId<GuestThread>,
3031        store: &mut StoreOpaque,
3032        runtime_instance: RuntimeComponentInstanceIndex,
3033    ) -> Result<u32> {
3034        let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3035            .guest_thread_insert(thread_id.rep())?;
3036        store
3037            .concurrent_state_mut()
3038            .get_mut(thread_id)?
3039            .instance_rep = Some(guest_id);
3040        Ok(guest_id)
3041    }
3042
3043    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3044    /// and `thread.switch-to` intrinsics.
3045    pub(crate) fn suspension_intrinsic(
3046        self,
3047        store: &mut StoreOpaque,
3048        caller: RuntimeComponentInstanceIndex,
3049        cancellable: bool,
3050        yielding: bool,
3051        to_thread: Option<u32>,
3052    ) -> Result<WaitResult> {
3053        // There could be a pending cancellation from a previous uncancellable wait
3054        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3055            return Ok(WaitResult::Cancelled);
3056        }
3057
3058        self.id().get(store).check_may_leave(caller)?;
3059
3060        if let Some(thread) = to_thread {
3061            self.resume_suspended_thread(store, caller, thread, true)?;
3062        }
3063
3064        let state = store.concurrent_state_mut();
3065        let guest_thread = state.guest_thread.unwrap();
3066        let reason = if yielding {
3067            SuspendReason::Yielding {
3068                thread: guest_thread,
3069            }
3070        } else {
3071            SuspendReason::ExplicitlySuspending {
3072                thread: guest_thread,
3073            }
3074        };
3075
3076        store.suspend(reason)?;
3077
3078        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3079            Ok(WaitResult::Cancelled)
3080        } else {
3081            Ok(WaitResult::Completed)
3082        }
3083    }
3084
3085    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3086    fn waitable_check(
3087        self,
3088        store: &mut StoreOpaque,
3089        cancellable: bool,
3090        check: WaitableCheck,
3091    ) -> Result<u32> {
3092        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3093
3094        let (wait, set) = match &check {
3095            WaitableCheck::Wait(params) => (true, Some(params.set)),
3096            WaitableCheck::Poll(params) => (false, Some(params.set)),
3097        };
3098
3099        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3100        // First, suspend this fiber, allowing any other threads to run.
3101        store.suspend(SuspendReason::Yielding {
3102            thread: guest_thread,
3103        })?;
3104
3105        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3106
3107        let state = store.concurrent_state_mut();
3108        let task = state.get_mut(guest_thread.task)?;
3109
3110        // If we're waiting, and there are no events immediately available,
3111        // suspend the fiber until that changes.
3112        if wait {
3113            let set = set.unwrap();
3114
3115            if (task.event.is_none()
3116                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3117                && state.get_mut(set)?.ready.is_empty()
3118            {
3119                if cancellable {
3120                    let old = state
3121                        .get_mut(guest_thread.thread)?
3122                        .wake_on_cancel
3123                        .replace(set);
3124                    assert!(old.is_none());
3125                }
3126
3127                store.suspend(SuspendReason::Waiting {
3128                    set,
3129                    thread: guest_thread,
3130                })?;
3131            }
3132        }
3133
3134        log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3135
3136        let result = match check {
3137            // Deliver any pending events to the guest and return.
3138            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3139                let event =
3140                    self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3141
3142                let (ordinal, handle, result) = if wait {
3143                    let (event, waitable) = event.unwrap();
3144                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3145                    let (ordinal, result) = event.parts();
3146                    (ordinal, handle, result)
3147                } else {
3148                    if let Some((event, waitable)) = event {
3149                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3150                        let (ordinal, result) = event.parts();
3151                        (ordinal, handle, result)
3152                    } else {
3153                        log::trace!(
3154                            "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3155                            guest_thread.task,
3156                            params.set
3157                        );
3158                        let (ordinal, result) = Event::None.parts();
3159                        (ordinal, 0, result)
3160                    }
3161                };
3162                let memory = self.options_memory_mut(store, params.options);
3163                let ptr =
3164                    func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3165                memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3166                memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3167                Ok(ordinal)
3168            }
3169        };
3170
3171        result
3172    }
3173
3174    /// Implements the `subtask.cancel` intrinsic.
3175    pub(crate) fn subtask_cancel(
3176        self,
3177        store: &mut StoreOpaque,
3178        caller_instance: RuntimeComponentInstanceIndex,
3179        async_: bool,
3180        task_id: u32,
3181    ) -> Result<u32> {
3182        self.id().get(store).check_may_leave(caller_instance)?;
3183        let (rep, is_host) =
3184            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3185        let (waitable, expected_caller_instance) = if is_host {
3186            let id = TableId::<HostTask>::new(rep);
3187            (
3188                Waitable::Host(id),
3189                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3190            )
3191        } else {
3192            let id = TableId::<GuestTask>::new(rep);
3193            if let Caller::Guest { instance, .. } =
3194                &store.concurrent_state_mut().get_mut(id)?.caller
3195            {
3196                (Waitable::Guest(id), *instance)
3197            } else {
3198                unreachable!()
3199            }
3200        };
3201        // Since waitables can neither be passed between instances nor forged,
3202        // this should never fail unless there's a bug in Wasmtime, but we check
3203        // here to be sure:
3204        assert_eq!(expected_caller_instance, caller_instance);
3205
3206        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3207
3208        let concurrent_state = store.concurrent_state_mut();
3209        if let Waitable::Host(host_task) = waitable {
3210            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3211                handle.abort();
3212                return Ok(Status::ReturnCancelled as u32);
3213            }
3214        } else {
3215            let caller = concurrent_state.guest_thread.unwrap();
3216            let guest_task = TableId::<GuestTask>::new(rep);
3217            let task = concurrent_state.get_mut(guest_task)?;
3218            if !task.already_lowered_parameters() {
3219                task.lower_params = None;
3220                task.lift_result = None;
3221
3222                // Not yet started; cancel and remove from pending
3223                let callee_instance = task.instance;
3224
3225                let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3226                let pending_count = pending.len();
3227                pending.retain(|thread, _| thread.task != guest_task);
3228                // If there were no pending threads for this task, we're in an error state
3229                if pending.len() == pending_count {
3230                    bail!("`subtask.cancel` called after terminal status delivered");
3231                }
3232                return Ok(Status::StartCancelled as u32);
3233            } else if !task.returned_or_cancelled() {
3234                // Started, but not yet returned or cancelled; send the
3235                // `CANCELLED` event
3236                task.cancel_sent = true;
3237                // Note that this might overwrite an event that was set earlier
3238                // (e.g. `Event::None` if the task is yielding, or
3239                // `Event::Cancelled` if it was already cancelled), but that's
3240                // okay -- this should supersede the previous state.
3241                task.event = Some(Event::Cancelled);
3242                for thread in task.threads.clone() {
3243                    let thread = QualifiedThreadId {
3244                        task: guest_task,
3245                        thread,
3246                    };
3247                    if let Some(set) = concurrent_state
3248                        .get_mut(thread.thread)
3249                        .unwrap()
3250                        .wake_on_cancel
3251                        .take()
3252                    {
3253                        let item = match concurrent_state
3254                            .get_mut(set)?
3255                            .waiting
3256                            .remove(&thread)
3257                            .unwrap()
3258                        {
3259                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3260                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3261                                thread,
3262                                kind: GuestCallKind::DeliverEvent {
3263                                    instance,
3264                                    set: None,
3265                                },
3266                            }),
3267                        };
3268                        concurrent_state.push_high_priority(item);
3269
3270                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3271                        break;
3272                    }
3273                }
3274
3275                let concurrent_state = store.concurrent_state_mut();
3276                let task = concurrent_state.get_mut(guest_task)?;
3277                if !task.returned_or_cancelled() {
3278                    if async_ {
3279                        return Ok(BLOCKED);
3280                    } else {
3281                        store.wait_for_event(Waitable::Guest(guest_task))?;
3282                    }
3283                }
3284            }
3285        }
3286
3287        let event = waitable.take_event(store.concurrent_state_mut())?;
3288        if let Some(Event::Subtask {
3289            status: status @ (Status::Returned | Status::ReturnCancelled),
3290        }) = event
3291        {
3292            Ok(status as u32)
3293        } else {
3294            bail!("`subtask.cancel` called after terminal status delivered");
3295        }
3296    }
3297
3298    pub(crate) fn context_get(
3299        self,
3300        store: &mut StoreOpaque,
3301        caller: RuntimeComponentInstanceIndex,
3302        slot: u32,
3303    ) -> Result<u32> {
3304        self.id().get(store).check_may_leave(caller)?;
3305        store.concurrent_state_mut().context_get(slot)
3306    }
3307
3308    pub(crate) fn context_set(
3309        self,
3310        store: &mut StoreOpaque,
3311        caller: RuntimeComponentInstanceIndex,
3312        slot: u32,
3313        value: u32,
3314    ) -> Result<()> {
3315        self.id().get(store).check_may_leave(caller)?;
3316        store.concurrent_state_mut().context_set(slot, value)
3317    }
3318}
3319
3320/// Trait representing component model ABI async intrinsics and fused adapter
3321/// helper functions.
3322///
3323/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3324/// which must be valid for at least the duration of the call (and possibly for
3325/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3326/// pointers used for async calls).
3327pub trait VMComponentAsyncStore {
3328    /// A helper function for fused adapter modules involving calls where the
3329    /// one of the caller or callee is async.
3330    ///
3331    /// This helper is not used when the caller and callee both use the sync
3332    /// ABI, only when at least one is async is this used.
3333    unsafe fn prepare_call(
3334        &mut self,
3335        instance: Instance,
3336        memory: *mut VMMemoryDefinition,
3337        start: *mut VMFuncRef,
3338        return_: *mut VMFuncRef,
3339        caller_instance: RuntimeComponentInstanceIndex,
3340        callee_instance: RuntimeComponentInstanceIndex,
3341        task_return_type: TypeTupleIndex,
3342        string_encoding: u8,
3343        result_count: u32,
3344        storage: *mut ValRaw,
3345        storage_len: usize,
3346    ) -> Result<()>;
3347
3348    /// A helper function for fused adapter modules involving calls where the
3349    /// caller is sync-lowered but the callee is async-lifted.
3350    unsafe fn sync_start(
3351        &mut self,
3352        instance: Instance,
3353        callback: *mut VMFuncRef,
3354        callee: *mut VMFuncRef,
3355        param_count: u32,
3356        storage: *mut MaybeUninit<ValRaw>,
3357        storage_len: usize,
3358    ) -> Result<()>;
3359
3360    /// A helper function for fused adapter modules involving calls where the
3361    /// caller is async-lowered.
3362    unsafe fn async_start(
3363        &mut self,
3364        instance: Instance,
3365        callback: *mut VMFuncRef,
3366        post_return: *mut VMFuncRef,
3367        callee: *mut VMFuncRef,
3368        param_count: u32,
3369        result_count: u32,
3370        flags: u32,
3371    ) -> Result<u32>;
3372
3373    /// The `future.write` intrinsic.
3374    fn future_write(
3375        &mut self,
3376        instance: Instance,
3377        caller: RuntimeComponentInstanceIndex,
3378        ty: TypeFutureTableIndex,
3379        options: OptionsIndex,
3380        future: u32,
3381        address: u32,
3382    ) -> Result<u32>;
3383
3384    /// The `future.read` intrinsic.
3385    fn future_read(
3386        &mut self,
3387        instance: Instance,
3388        caller: RuntimeComponentInstanceIndex,
3389        ty: TypeFutureTableIndex,
3390        options: OptionsIndex,
3391        future: u32,
3392        address: u32,
3393    ) -> Result<u32>;
3394
3395    /// The `future.drop-writable` intrinsic.
3396    fn future_drop_writable(
3397        &mut self,
3398        instance: Instance,
3399        caller: RuntimeComponentInstanceIndex,
3400        ty: TypeFutureTableIndex,
3401        writer: u32,
3402    ) -> Result<()>;
3403
3404    /// The `stream.write` intrinsic.
3405    fn stream_write(
3406        &mut self,
3407        instance: Instance,
3408        caller: RuntimeComponentInstanceIndex,
3409        ty: TypeStreamTableIndex,
3410        options: OptionsIndex,
3411        stream: u32,
3412        address: u32,
3413        count: u32,
3414    ) -> Result<u32>;
3415
3416    /// The `stream.read` intrinsic.
3417    fn stream_read(
3418        &mut self,
3419        instance: Instance,
3420        caller: RuntimeComponentInstanceIndex,
3421        ty: TypeStreamTableIndex,
3422        options: OptionsIndex,
3423        stream: u32,
3424        address: u32,
3425        count: u32,
3426    ) -> Result<u32>;
3427
3428    /// The "fast-path" implementation of the `stream.write` intrinsic for
3429    /// "flat" (i.e. memcpy-able) payloads.
3430    fn flat_stream_write(
3431        &mut self,
3432        instance: Instance,
3433        caller: RuntimeComponentInstanceIndex,
3434        ty: TypeStreamTableIndex,
3435        options: OptionsIndex,
3436        payload_size: u32,
3437        payload_align: u32,
3438        stream: u32,
3439        address: u32,
3440        count: u32,
3441    ) -> Result<u32>;
3442
3443    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3444    /// (i.e. memcpy-able) payloads.
3445    fn flat_stream_read(
3446        &mut self,
3447        instance: Instance,
3448        caller: RuntimeComponentInstanceIndex,
3449        ty: TypeStreamTableIndex,
3450        options: OptionsIndex,
3451        payload_size: u32,
3452        payload_align: u32,
3453        stream: u32,
3454        address: u32,
3455        count: u32,
3456    ) -> Result<u32>;
3457
3458    /// The `stream.drop-writable` intrinsic.
3459    fn stream_drop_writable(
3460        &mut self,
3461        instance: Instance,
3462        caller: RuntimeComponentInstanceIndex,
3463        ty: TypeStreamTableIndex,
3464        writer: u32,
3465    ) -> Result<()>;
3466
3467    /// The `error-context.debug-message` intrinsic.
3468    fn error_context_debug_message(
3469        &mut self,
3470        instance: Instance,
3471        caller: RuntimeComponentInstanceIndex,
3472        ty: TypeComponentLocalErrorContextTableIndex,
3473        options: OptionsIndex,
3474        err_ctx_handle: u32,
3475        debug_msg_address: u32,
3476    ) -> Result<()>;
3477
3478    /// The `thread.new_indirect` intrinsic
3479    fn thread_new_indirect(
3480        &mut self,
3481        instance: Instance,
3482        caller: RuntimeComponentInstanceIndex,
3483        func_ty_idx: TypeFuncIndex,
3484        start_func_table_idx: RuntimeTableIndex,
3485        start_func_idx: u32,
3486        context: i32,
3487    ) -> Result<u32>;
3488}
3489
3490/// SAFETY: See trait docs.
3491impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3492    unsafe fn prepare_call(
3493        &mut self,
3494        instance: Instance,
3495        memory: *mut VMMemoryDefinition,
3496        start: *mut VMFuncRef,
3497        return_: *mut VMFuncRef,
3498        caller_instance: RuntimeComponentInstanceIndex,
3499        callee_instance: RuntimeComponentInstanceIndex,
3500        task_return_type: TypeTupleIndex,
3501        string_encoding: u8,
3502        result_count_or_max_if_async: u32,
3503        storage: *mut ValRaw,
3504        storage_len: usize,
3505    ) -> Result<()> {
3506        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3507        // this method will have ensured that `storage` is a valid
3508        // pointer containing at least `storage_len` items.
3509        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3510
3511        unsafe {
3512            instance.prepare_call(
3513                StoreContextMut(self),
3514                start,
3515                return_,
3516                caller_instance,
3517                callee_instance,
3518                task_return_type,
3519                memory,
3520                string_encoding,
3521                match result_count_or_max_if_async {
3522                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3523                        params,
3524                        has_result: false,
3525                    },
3526                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3527                        params,
3528                        has_result: true,
3529                    },
3530                    result_count => CallerInfo::Sync {
3531                        params,
3532                        result_count,
3533                    },
3534                },
3535            )
3536        }
3537    }
3538
3539    unsafe fn sync_start(
3540        &mut self,
3541        instance: Instance,
3542        callback: *mut VMFuncRef,
3543        callee: *mut VMFuncRef,
3544        param_count: u32,
3545        storage: *mut MaybeUninit<ValRaw>,
3546        storage_len: usize,
3547    ) -> Result<()> {
3548        unsafe {
3549            instance
3550                .start_call(
3551                    StoreContextMut(self),
3552                    callback,
3553                    ptr::null_mut(),
3554                    callee,
3555                    param_count,
3556                    1,
3557                    START_FLAG_ASYNC_CALLEE,
3558                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3559                    // this method will have ensured that `storage` is a valid
3560                    // pointer containing at least `storage_len` items.
3561                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3562                )
3563                .map(drop)
3564        }
3565    }
3566
3567    unsafe fn async_start(
3568        &mut self,
3569        instance: Instance,
3570        callback: *mut VMFuncRef,
3571        post_return: *mut VMFuncRef,
3572        callee: *mut VMFuncRef,
3573        param_count: u32,
3574        result_count: u32,
3575        flags: u32,
3576    ) -> Result<u32> {
3577        unsafe {
3578            instance.start_call(
3579                StoreContextMut(self),
3580                callback,
3581                post_return,
3582                callee,
3583                param_count,
3584                result_count,
3585                flags,
3586                None,
3587            )
3588        }
3589    }
3590
3591    fn future_write(
3592        &mut self,
3593        instance: Instance,
3594        caller: RuntimeComponentInstanceIndex,
3595        ty: TypeFutureTableIndex,
3596        options: OptionsIndex,
3597        future: u32,
3598        address: u32,
3599    ) -> Result<u32> {
3600        instance.id().get(self).check_may_leave(caller)?;
3601        instance
3602            .guest_write(
3603                StoreContextMut(self),
3604                TransmitIndex::Future(ty),
3605                options,
3606                None,
3607                future,
3608                address,
3609                1,
3610            )
3611            .map(|result| result.encode())
3612    }
3613
3614    fn future_read(
3615        &mut self,
3616        instance: Instance,
3617        caller: RuntimeComponentInstanceIndex,
3618        ty: TypeFutureTableIndex,
3619        options: OptionsIndex,
3620        future: u32,
3621        address: u32,
3622    ) -> Result<u32> {
3623        instance.id().get(self).check_may_leave(caller)?;
3624        instance
3625            .guest_read(
3626                StoreContextMut(self),
3627                TransmitIndex::Future(ty),
3628                options,
3629                None,
3630                future,
3631                address,
3632                1,
3633            )
3634            .map(|result| result.encode())
3635    }
3636
3637    fn stream_write(
3638        &mut self,
3639        instance: Instance,
3640        caller: RuntimeComponentInstanceIndex,
3641        ty: TypeStreamTableIndex,
3642        options: OptionsIndex,
3643        stream: u32,
3644        address: u32,
3645        count: u32,
3646    ) -> Result<u32> {
3647        instance.id().get(self).check_may_leave(caller)?;
3648        instance
3649            .guest_write(
3650                StoreContextMut(self),
3651                TransmitIndex::Stream(ty),
3652                options,
3653                None,
3654                stream,
3655                address,
3656                count,
3657            )
3658            .map(|result| result.encode())
3659    }
3660
3661    fn stream_read(
3662        &mut self,
3663        instance: Instance,
3664        caller: RuntimeComponentInstanceIndex,
3665        ty: TypeStreamTableIndex,
3666        options: OptionsIndex,
3667        stream: u32,
3668        address: u32,
3669        count: u32,
3670    ) -> Result<u32> {
3671        instance.id().get(self).check_may_leave(caller)?;
3672        instance
3673            .guest_read(
3674                StoreContextMut(self),
3675                TransmitIndex::Stream(ty),
3676                options,
3677                None,
3678                stream,
3679                address,
3680                count,
3681            )
3682            .map(|result| result.encode())
3683    }
3684
3685    fn future_drop_writable(
3686        &mut self,
3687        instance: Instance,
3688        caller: RuntimeComponentInstanceIndex,
3689        ty: TypeFutureTableIndex,
3690        writer: u32,
3691    ) -> Result<()> {
3692        instance.id().get(self).check_may_leave(caller)?;
3693        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3694    }
3695
3696    fn flat_stream_write(
3697        &mut self,
3698        instance: Instance,
3699        caller: RuntimeComponentInstanceIndex,
3700        ty: TypeStreamTableIndex,
3701        options: OptionsIndex,
3702        payload_size: u32,
3703        payload_align: u32,
3704        stream: u32,
3705        address: u32,
3706        count: u32,
3707    ) -> Result<u32> {
3708        instance.id().get(self).check_may_leave(caller)?;
3709        instance
3710            .guest_write(
3711                StoreContextMut(self),
3712                TransmitIndex::Stream(ty),
3713                options,
3714                Some(FlatAbi {
3715                    size: payload_size,
3716                    align: payload_align,
3717                }),
3718                stream,
3719                address,
3720                count,
3721            )
3722            .map(|result| result.encode())
3723    }
3724
3725    fn flat_stream_read(
3726        &mut self,
3727        instance: Instance,
3728        caller: RuntimeComponentInstanceIndex,
3729        ty: TypeStreamTableIndex,
3730        options: OptionsIndex,
3731        payload_size: u32,
3732        payload_align: u32,
3733        stream: u32,
3734        address: u32,
3735        count: u32,
3736    ) -> Result<u32> {
3737        instance.id().get(self).check_may_leave(caller)?;
3738        instance
3739            .guest_read(
3740                StoreContextMut(self),
3741                TransmitIndex::Stream(ty),
3742                options,
3743                Some(FlatAbi {
3744                    size: payload_size,
3745                    align: payload_align,
3746                }),
3747                stream,
3748                address,
3749                count,
3750            )
3751            .map(|result| result.encode())
3752    }
3753
3754    fn stream_drop_writable(
3755        &mut self,
3756        instance: Instance,
3757        caller: RuntimeComponentInstanceIndex,
3758        ty: TypeStreamTableIndex,
3759        writer: u32,
3760    ) -> Result<()> {
3761        instance.id().get(self).check_may_leave(caller)?;
3762        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3763    }
3764
3765    fn error_context_debug_message(
3766        &mut self,
3767        instance: Instance,
3768        caller: RuntimeComponentInstanceIndex,
3769        ty: TypeComponentLocalErrorContextTableIndex,
3770        options: OptionsIndex,
3771        err_ctx_handle: u32,
3772        debug_msg_address: u32,
3773    ) -> Result<()> {
3774        instance.id().get(self).check_may_leave(caller)?;
3775        instance.error_context_debug_message(
3776            StoreContextMut(self),
3777            ty,
3778            options,
3779            err_ctx_handle,
3780            debug_msg_address,
3781        )
3782    }
3783
3784    fn thread_new_indirect(
3785        &mut self,
3786        instance: Instance,
3787        caller: RuntimeComponentInstanceIndex,
3788        func_ty_idx: TypeFuncIndex,
3789        start_func_table_idx: RuntimeTableIndex,
3790        start_func_idx: u32,
3791        context: i32,
3792    ) -> Result<u32> {
3793        instance.thread_new_indirect(
3794            StoreContextMut(self),
3795            caller,
3796            func_ty_idx,
3797            start_func_table_idx,
3798            start_func_idx,
3799            context,
3800        )
3801    }
3802}
3803
3804type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3805
3806/// Represents the state of a pending host task.
3807struct HostTask {
3808    common: WaitableCommon,
3809    caller_instance: RuntimeComponentInstanceIndex,
3810    join_handle: Option<JoinHandle>,
3811}
3812
3813impl HostTask {
3814    fn new(
3815        caller_instance: RuntimeComponentInstanceIndex,
3816        join_handle: Option<JoinHandle>,
3817    ) -> Self {
3818        Self {
3819            common: WaitableCommon::default(),
3820            caller_instance,
3821            join_handle,
3822        }
3823    }
3824}
3825
3826impl TableDebug for HostTask {
3827    fn type_name() -> &'static str {
3828        "HostTask"
3829    }
3830}
3831
3832type CallbackFn = Box<
3833    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3834        + Send
3835        + Sync
3836        + 'static,
3837>;
3838
3839/// Represents the caller of a given guest task.
3840enum Caller {
3841    /// The host called the guest task.
3842    Host {
3843        /// If present, may be used to deliver the result.
3844        tx: Option<oneshot::Sender<LiftedResult>>,
3845        /// Channel to notify once all subtasks spawned by this caller have
3846        /// completed.
3847        ///
3848        /// Note that we'll never actually send anything to this channel;
3849        /// dropping it when the refcount goes to zero is sufficient to notify
3850        /// the receiver.
3851        exit_tx: Arc<oneshot::Sender<()>>,
3852        /// If true, there's a host future that must be dropped before the task
3853        /// can be deleted.
3854        host_future_present: bool,
3855        /// If true, call `post-return` function (if any) automatically.
3856        call_post_return_automatically: bool,
3857    },
3858    /// Another guest thread called the guest task
3859    Guest {
3860        /// The id of the caller
3861        thread: QualifiedThreadId,
3862        /// The instance to use to enforce reentrance rules.
3863        ///
3864        /// Note that this might not be the same as the instance the caller task
3865        /// started executing in given that one or more synchronous guest->guest
3866        /// calls may have occurred involving multiple instances.
3867        instance: RuntimeComponentInstanceIndex,
3868    },
3869}
3870
3871/// Represents a closure and related canonical ABI parameters required to
3872/// validate a `task.return` call at runtime and lift the result.
3873struct LiftResult {
3874    lift: RawLift,
3875    ty: TypeTupleIndex,
3876    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3877    string_encoding: StringEncoding,
3878}
3879
3880/// The table ID for a guest thread, qualified by the task to which it belongs.
3881///
3882/// This exists to minimize table lookups and the necessity to pass stores around mutably
3883/// for the common case of identifying the task to which a thread belongs.
3884#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3885struct QualifiedThreadId {
3886    task: TableId<GuestTask>,
3887    thread: TableId<GuestThread>,
3888}
3889
3890impl QualifiedThreadId {
3891    fn qualify(
3892        state: &mut ConcurrentState,
3893        thread: TableId<GuestThread>,
3894    ) -> Result<QualifiedThreadId> {
3895        Ok(QualifiedThreadId {
3896            task: state.get_mut(thread)?.parent_task,
3897            thread,
3898        })
3899    }
3900}
3901
3902impl fmt::Debug for QualifiedThreadId {
3903    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3904        f.debug_tuple("QualifiedThreadId")
3905            .field(&self.task.rep())
3906            .field(&self.thread.rep())
3907            .finish()
3908    }
3909}
3910
3911enum GuestThreadState {
3912    NotStartedImplicit,
3913    NotStartedExplicit(
3914        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3915    ),
3916    Running,
3917    Suspended(StoreFiber<'static>),
3918    Pending,
3919    Completed,
3920}
3921pub struct GuestThread {
3922    /// Context-local state used to implement the `context.{get,set}`
3923    /// intrinsics.
3924    context: [u32; 2],
3925    /// The owning guest task.
3926    parent_task: TableId<GuestTask>,
3927    /// If present, indicates that the thread is currently waiting on the
3928    /// specified set but may be cancelled and woken immediately.
3929    wake_on_cancel: Option<TableId<WaitableSet>>,
3930    /// The execution state of this guest thread
3931    state: GuestThreadState,
3932    /// The index of this thread in the component instance's handle table.
3933    /// This must always be `Some` after initialization.
3934    instance_rep: Option<u32>,
3935}
3936
3937impl GuestThread {
3938    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
3939    /// handle.
3940    fn from_instance(
3941        state: Pin<&mut ComponentInstance>,
3942        caller_instance: RuntimeComponentInstanceIndex,
3943        guest_thread: u32,
3944    ) -> Result<TableId<Self>> {
3945        let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3946        Ok(TableId::new(rep))
3947    }
3948
3949    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3950        Self {
3951            context: [0; 2],
3952            parent_task,
3953            wake_on_cancel: None,
3954            state: GuestThreadState::NotStartedImplicit,
3955            instance_rep: None,
3956        }
3957    }
3958
3959    fn new_explicit(
3960        parent_task: TableId<GuestTask>,
3961        start_func: Box<
3962            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3963        >,
3964    ) -> Self {
3965        Self {
3966            context: [0; 2],
3967            parent_task,
3968            wake_on_cancel: None,
3969            state: GuestThreadState::NotStartedExplicit(start_func),
3970            instance_rep: None,
3971        }
3972    }
3973}
3974
3975impl TableDebug for GuestThread {
3976    fn type_name() -> &'static str {
3977        "GuestThread"
3978    }
3979}
3980
3981enum SyncResult {
3982    NotProduced,
3983    Produced(Option<ValRaw>),
3984    Taken,
3985}
3986
3987impl SyncResult {
3988    fn take(&mut self) -> Option<Option<ValRaw>> {
3989        match mem::replace(self, SyncResult::Taken) {
3990            SyncResult::NotProduced => None,
3991            SyncResult::Produced(val) => Some(val),
3992            SyncResult::Taken => {
3993                panic!("attempted to take a synchronous result that was already taken")
3994            }
3995        }
3996    }
3997}
3998
3999#[derive(Debug)]
4000enum HostFutureState {
4001    NotApplicable,
4002    Live,
4003    Dropped,
4004}
4005
4006/// Represents a pending guest task.
4007pub(crate) struct GuestTask {
4008    /// See `WaitableCommon`
4009    common: WaitableCommon,
4010    /// Closure to lower the parameters passed to this task.
4011    lower_params: Option<RawLower>,
4012    /// See `LiftResult`
4013    lift_result: Option<LiftResult>,
4014    /// A place to stash the type-erased lifted result if it can't be delivered
4015    /// immediately.
4016    result: Option<LiftedResult>,
4017    /// Closure to call the callback function for an async-lifted export, if
4018    /// provided.
4019    callback: Option<CallbackFn>,
4020    /// See `Caller`
4021    caller: Caller,
4022    /// A place to stash the call context for managing resource borrows while
4023    /// switching between guest tasks.
4024    call_context: Option<CallContext>,
4025    /// A place to stash the lowered result for a sync-to-async call until it
4026    /// can be returned to the caller.
4027    sync_result: SyncResult,
4028    /// Whether or not the task has been cancelled (i.e. whether the task is
4029    /// permitted to call `task.cancel`).
4030    cancel_sent: bool,
4031    /// Whether or not we've sent a `Status::Starting` event to any current or
4032    /// future waiters for this waitable.
4033    starting_sent: bool,
4034    /// Pending guest subtasks created by this task (directly or indirectly).
4035    ///
4036    /// This is used to re-parent subtasks which are still running when their
4037    /// parent task is disposed.
4038    subtasks: HashSet<TableId<GuestTask>>,
4039    /// Scratch waitable set used to watch subtasks during synchronous calls.
4040    sync_call_set: TableId<WaitableSet>,
4041    /// The instance to which the exported function for this guest task belongs.
4042    ///
4043    /// Note that the task may do a sync->sync call via a fused adapter which
4044    /// results in that task executing code in a different instance, and it may
4045    /// call host functions and intrinsics from that other instance.
4046    instance: RuntimeComponentInstanceIndex,
4047    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4048    /// delivered to this task.
4049    event: Option<Event>,
4050    /// The `ExportIndex` of the guest function being called, if known.
4051    function_index: Option<ExportIndex>,
4052    /// Whether or not the task has exited.
4053    exited: bool,
4054    /// Threads belonging to this task
4055    threads: HashSet<TableId<GuestThread>>,
4056    /// The state of the host future that represents an async task, which must
4057    /// be dropped before we can delete the task.
4058    host_future_state: HostFutureState,
4059}
4060
4061impl GuestTask {
4062    fn already_lowered_parameters(&self) -> bool {
4063        // We reset `lower_params` after we lower the parameters
4064        self.lower_params.is_none()
4065    }
4066    fn returned_or_cancelled(&self) -> bool {
4067        // We reset `lift_result` after we return or exit
4068        self.lift_result.is_none()
4069    }
4070    fn ready_to_delete(&self) -> bool {
4071        let threads_completed = self.threads.is_empty();
4072        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4073        let pending_completion_event = matches!(
4074            self.common.event,
4075            Some(Event::Subtask {
4076                status: Status::Returned | Status::ReturnCancelled
4077            })
4078        );
4079        let ready = threads_completed
4080            && !has_sync_result
4081            && !pending_completion_event
4082            && !matches!(self.host_future_state, HostFutureState::Live);
4083        log::trace!(
4084            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4085            threads_completed,
4086            has_sync_result,
4087            pending_completion_event,
4088            self.host_future_state
4089        );
4090        ready
4091    }
4092    fn new(
4093        state: &mut ConcurrentState,
4094        lower_params: RawLower,
4095        lift_result: LiftResult,
4096        caller: Caller,
4097        callback: Option<CallbackFn>,
4098        component_instance: RuntimeComponentInstanceIndex,
4099    ) -> Result<Self> {
4100        let sync_call_set = state.push(WaitableSet::default())?;
4101        let host_future_state = match &caller {
4102            Caller::Guest { .. } => HostFutureState::NotApplicable,
4103            Caller::Host {
4104                host_future_present,
4105                ..
4106            } => {
4107                if *host_future_present {
4108                    HostFutureState::Live
4109                } else {
4110                    HostFutureState::NotApplicable
4111                }
4112            }
4113        };
4114        Ok(Self {
4115            common: WaitableCommon::default(),
4116            lower_params: Some(lower_params),
4117            lift_result: Some(lift_result),
4118            result: None,
4119            callback,
4120            caller,
4121            call_context: Some(CallContext::default()),
4122            sync_result: SyncResult::NotProduced,
4123            cancel_sent: false,
4124            starting_sent: false,
4125            subtasks: HashSet::new(),
4126            sync_call_set,
4127            instance: component_instance,
4128            event: None,
4129            function_index: None,
4130            exited: false,
4131            threads: HashSet::new(),
4132            host_future_state,
4133        })
4134    }
4135
4136    /// Dispose of this guest task, reparenting any pending subtasks to the
4137    /// caller.
4138    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4139        // If there are not-yet-delivered completion events for subtasks in
4140        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4141        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4142            if let Some(Event::Subtask {
4143                status: Status::Returned | Status::ReturnCancelled,
4144            }) = waitable.common(state)?.event
4145            {
4146                waitable.delete_from(state)?;
4147            }
4148        }
4149
4150        assert!(self.threads.is_empty());
4151
4152        state.delete(self.sync_call_set)?;
4153
4154        // Reparent any pending subtasks to the caller.
4155        match &self.caller {
4156            Caller::Guest {
4157                thread,
4158                instance: runtime_instance,
4159            } => {
4160                let task_mut = state.get_mut(thread.task)?;
4161                let present = task_mut.subtasks.remove(&me);
4162                assert!(present);
4163
4164                for subtask in &self.subtasks {
4165                    task_mut.subtasks.insert(*subtask);
4166                }
4167
4168                for subtask in &self.subtasks {
4169                    state.get_mut(*subtask)?.caller = Caller::Guest {
4170                        thread: *thread,
4171                        instance: *runtime_instance,
4172                    };
4173                }
4174            }
4175            Caller::Host { exit_tx, .. } => {
4176                for subtask in &self.subtasks {
4177                    state.get_mut(*subtask)?.caller = Caller::Host {
4178                        tx: None,
4179                        // Clone `exit_tx` to ensure that it is only dropped
4180                        // once all transitive subtasks of the host call have
4181                        // exited:
4182                        exit_tx: exit_tx.clone(),
4183                        host_future_present: false,
4184                        call_post_return_automatically: true,
4185                    };
4186                }
4187            }
4188        }
4189
4190        for subtask in self.subtasks {
4191            if state.get_mut(subtask)?.exited {
4192                Waitable::Guest(subtask).delete_from(state)?;
4193            }
4194        }
4195
4196        Ok(())
4197    }
4198
4199    fn call_post_return_automatically(&self) -> bool {
4200        matches!(
4201            self.caller,
4202            Caller::Guest { .. }
4203                | Caller::Host {
4204                    call_post_return_automatically: true,
4205                    ..
4206                }
4207        )
4208    }
4209}
4210
4211impl TableDebug for GuestTask {
4212    fn type_name() -> &'static str {
4213        "GuestTask"
4214    }
4215}
4216
4217/// Represents state common to all kinds of waitables.
4218#[derive(Default)]
4219struct WaitableCommon {
4220    /// The currently pending event for this waitable, if any.
4221    event: Option<Event>,
4222    /// The set to which this waitable belongs, if any.
4223    set: Option<TableId<WaitableSet>>,
4224    /// The handle with which the guest refers to this waitable, if any.
4225    handle: Option<u32>,
4226}
4227
4228/// Represents a Component Model Async `waitable`.
4229#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4230enum Waitable {
4231    /// A host task
4232    Host(TableId<HostTask>),
4233    /// A guest task
4234    Guest(TableId<GuestTask>),
4235    /// The read or write end of a stream or future
4236    Transmit(TableId<TransmitHandle>),
4237}
4238
4239impl Waitable {
4240    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4241    /// handle.
4242    fn from_instance(
4243        state: Pin<&mut ComponentInstance>,
4244        caller_instance: RuntimeComponentInstanceIndex,
4245        waitable: u32,
4246    ) -> Result<Self> {
4247        use crate::runtime::vm::component::Waitable;
4248
4249        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4250
4251        Ok(match kind {
4252            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4253            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4254            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4255        })
4256    }
4257
4258    /// Retrieve the host-visible identifier for this `Waitable`.
4259    fn rep(&self) -> u32 {
4260        match self {
4261            Self::Host(id) => id.rep(),
4262            Self::Guest(id) => id.rep(),
4263            Self::Transmit(id) => id.rep(),
4264        }
4265    }
4266
4267    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4268    /// remove it from any set it may currently belong to (when `set` is
4269    /// `None`).
4270    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4271        log::trace!("waitable {self:?} join set {set:?}",);
4272
4273        let old = mem::replace(&mut self.common(state)?.set, set);
4274
4275        if let Some(old) = old {
4276            match *self {
4277                Waitable::Host(id) => state.remove_child(id, old),
4278                Waitable::Guest(id) => state.remove_child(id, old),
4279                Waitable::Transmit(id) => state.remove_child(id, old),
4280            }?;
4281
4282            state.get_mut(old)?.ready.remove(self);
4283        }
4284
4285        if let Some(set) = set {
4286            match *self {
4287                Waitable::Host(id) => state.add_child(id, set),
4288                Waitable::Guest(id) => state.add_child(id, set),
4289                Waitable::Transmit(id) => state.add_child(id, set),
4290            }?;
4291
4292            if self.common(state)?.event.is_some() {
4293                self.mark_ready(state)?;
4294            }
4295        }
4296
4297        Ok(())
4298    }
4299
4300    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4301    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4302        Ok(match self {
4303            Self::Host(id) => &mut state.get_mut(*id)?.common,
4304            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4305            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4306        })
4307    }
4308
4309    /// Set or clear the pending event for this waitable and either deliver it
4310    /// to the first waiter, if any, or mark it as ready to be delivered to the
4311    /// next waiter that arrives.
4312    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4313        log::trace!("set event for {self:?}: {event:?}");
4314        self.common(state)?.event = event;
4315        self.mark_ready(state)
4316    }
4317
4318    /// Take the pending event from this waitable, leaving `None` in its place.
4319    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4320        let common = self.common(state)?;
4321        let event = common.event.take();
4322        if let Some(set) = self.common(state)?.set {
4323            state.get_mut(set)?.ready.remove(self);
4324        }
4325
4326        Ok(event)
4327    }
4328
4329    /// Deliver the current event for this waitable to the first waiter, if any,
4330    /// or else mark it as ready to be delivered to the next waiter that
4331    /// arrives.
4332    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4333        if let Some(set) = self.common(state)?.set {
4334            state.get_mut(set)?.ready.insert(*self);
4335            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4336                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4337                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4338
4339                let item = match mode {
4340                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4341                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4342                        thread,
4343                        kind: GuestCallKind::DeliverEvent {
4344                            instance,
4345                            set: Some(set),
4346                        },
4347                    }),
4348                };
4349                state.push_high_priority(item);
4350            }
4351        }
4352        Ok(())
4353    }
4354
4355    /// Remove this waitable from the instance's rep table.
4356    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4357        match self {
4358            Self::Host(task) => {
4359                log::trace!("delete host task {task:?}");
4360                state.delete(*task)?;
4361            }
4362            Self::Guest(task) => {
4363                log::trace!("delete guest task {task:?}");
4364                state.delete(*task)?.dispose(state, *task)?;
4365            }
4366            Self::Transmit(task) => {
4367                state.delete(*task)?;
4368            }
4369        }
4370
4371        Ok(())
4372    }
4373}
4374
4375impl fmt::Debug for Waitable {
4376    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4377        match self {
4378            Self::Host(id) => write!(f, "{id:?}"),
4379            Self::Guest(id) => write!(f, "{id:?}"),
4380            Self::Transmit(id) => write!(f, "{id:?}"),
4381        }
4382    }
4383}
4384
4385/// Represents a Component Model Async `waitable-set`.
4386#[derive(Default)]
4387struct WaitableSet {
4388    /// Which waitables in this set have pending events, if any.
4389    ready: BTreeSet<Waitable>,
4390    /// Which guest threads are currently waiting on this set, if any.
4391    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4392}
4393
4394impl TableDebug for WaitableSet {
4395    fn type_name() -> &'static str {
4396        "WaitableSet"
4397    }
4398}
4399
4400/// Type-erased closure to lower the parameters for a guest task.
4401type RawLower =
4402    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4403
4404/// Type-erased closure to lift the result for a guest task.
4405type RawLift = Box<
4406    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4407>;
4408
4409/// Type erased result of a guest task which may be downcast to the expected
4410/// type by a host caller (or simply ignored in the case of a guest caller; see
4411/// `DummyResult`).
4412type LiftedResult = Box<dyn Any + Send + Sync>;
4413
4414/// Used to return a result from a `LiftFn` when the actual result has already
4415/// been lowered to a guest task's stack and linear memory.
4416struct DummyResult;
4417
4418/// Represents the Component Model Async state of a (sub-)component instance.
4419#[derive(Default)]
4420struct InstanceState {
4421    /// Whether backpressure is set for this instance (enabled if >0)
4422    backpressure: u16,
4423    /// Whether this instance can be entered
4424    do_not_enter: bool,
4425    /// Pending calls for this instance which require `Self::backpressure` to be
4426    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4427    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4428}
4429
4430/// Represents the Component Model Async state of a store.
4431pub struct ConcurrentState {
4432    /// The currently running guest thread, if any.
4433    guest_thread: Option<QualifiedThreadId>,
4434
4435    /// The set of pending host and background tasks, if any.
4436    ///
4437    /// See `ComponentInstance::poll_until` for where we temporarily take this
4438    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4439    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4440    /// The table of waitables, waitable sets, etc.
4441    table: AlwaysMut<ResourceTable>,
4442    /// Per (sub-)component instance states.
4443    ///
4444    /// See `InstanceState` for details and note that this map is lazily
4445    /// populated as needed.
4446    // TODO: this can and should be a `PrimaryMap`
4447    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4448    /// The "high priority" work queue for this store's event loop.
4449    high_priority: Vec<WorkItem>,
4450    /// The "low priority" work queue for this store's event loop.
4451    low_priority: Vec<WorkItem>,
4452    /// A place to stash the reason a fiber is suspending so that the code which
4453    /// resumed it will know under what conditions the fiber should be resumed
4454    /// again.
4455    suspend_reason: Option<SuspendReason>,
4456    /// A cached fiber which is waiting for work to do.
4457    ///
4458    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4459    worker: Option<StoreFiber<'static>>,
4460    /// A place to stash the work item for which we're resuming a worker fiber.
4461    worker_item: Option<WorkerItem>,
4462
4463    /// Reference counts for all component error contexts
4464    ///
4465    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4466    /// (sub)component ref counts as tracked by `error_context_tables`, for
4467    /// example when the host holds one or more references to error contexts.
4468    ///
4469    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4470    /// component-wide representation) of the index into concurrent state for a given
4471    /// stored `ErrorContext`.
4472    ///
4473    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4474    /// as a `TableId<ErrorContextState>`.
4475    global_error_context_ref_counts:
4476        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4477}
4478
4479impl Default for ConcurrentState {
4480    fn default() -> Self {
4481        Self {
4482            guest_thread: None,
4483            table: AlwaysMut::new(ResourceTable::new()),
4484            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4485            instance_states: HashMap::new(),
4486            high_priority: Vec::new(),
4487            low_priority: Vec::new(),
4488            suspend_reason: None,
4489            worker: None,
4490            worker_item: None,
4491            global_error_context_ref_counts: BTreeMap::new(),
4492        }
4493    }
4494}
4495
4496impl ConcurrentState {
4497    /// Take ownership of any fibers and futures owned by this object.
4498    ///
4499    /// This should be used when disposing of the `Store` containing this object
4500    /// in order to gracefully resolve any and all fibers using
4501    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4502    /// use-after-free bugs due to fibers which may still have access to the
4503    /// `Store`.
4504    ///
4505    /// Additionally, the futures collected with this function should be dropped
4506    /// within a `tls::set` call, which will ensure than any futures closing
4507    /// over an `&Accessor` will have access to the store when dropped, allowing
4508    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4509    /// panicking.
4510    ///
4511    /// Note that this will leave the object in an inconsistent and unusable
4512    /// state, so it should only be used just prior to dropping it.
4513    pub(crate) fn take_fibers_and_futures(
4514        &mut self,
4515        fibers: &mut Vec<StoreFiber<'static>>,
4516        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4517    ) {
4518        for entry in self.table.get_mut().iter_mut() {
4519            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4520                for mode in mem::take(&mut set.waiting).into_values() {
4521                    if let WaitMode::Fiber(fiber) = mode {
4522                        fibers.push(fiber);
4523                    }
4524                }
4525            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4526                if let GuestThreadState::Suspended(fiber) =
4527                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4528                {
4529                    fibers.push(fiber);
4530                }
4531            }
4532        }
4533
4534        if let Some(fiber) = self.worker.take() {
4535            fibers.push(fiber);
4536        }
4537
4538        let mut take_items = |list| {
4539            for item in mem::take(list) {
4540                match item {
4541                    WorkItem::ResumeFiber(fiber) => {
4542                        fibers.push(fiber);
4543                    }
4544                    WorkItem::PushFuture(future) => {
4545                        self.futures
4546                            .get_mut()
4547                            .as_mut()
4548                            .unwrap()
4549                            .push(future.into_inner());
4550                    }
4551                    _ => {}
4552                }
4553            }
4554        };
4555
4556        take_items(&mut self.high_priority);
4557        take_items(&mut self.low_priority);
4558
4559        if let Some(them) = self.futures.get_mut().take() {
4560            futures.push(them);
4561        }
4562    }
4563
4564    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4565        self.instance_states.entry(instance).or_default()
4566    }
4567
4568    fn push<V: Send + Sync + 'static>(
4569        &mut self,
4570        value: V,
4571    ) -> Result<TableId<V>, ResourceTableError> {
4572        self.table.get_mut().push(value).map(TableId::from)
4573    }
4574
4575    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4576        self.table.get_mut().get_mut(&Resource::from(id))
4577    }
4578
4579    pub fn add_child<T: 'static, U: 'static>(
4580        &mut self,
4581        child: TableId<T>,
4582        parent: TableId<U>,
4583    ) -> Result<(), ResourceTableError> {
4584        self.table
4585            .get_mut()
4586            .add_child(Resource::from(child), Resource::from(parent))
4587    }
4588
4589    pub fn remove_child<T: 'static, U: 'static>(
4590        &mut self,
4591        child: TableId<T>,
4592        parent: TableId<U>,
4593    ) -> Result<(), ResourceTableError> {
4594        self.table
4595            .get_mut()
4596            .remove_child(Resource::from(child), Resource::from(parent))
4597    }
4598
4599    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4600        self.table.get_mut().delete(Resource::from(id))
4601    }
4602
4603    fn push_future(&mut self, future: HostTaskFuture) {
4604        // Note that we can't directly push to `ConcurrentState::futures` here
4605        // since this may be called from a future that's being polled inside
4606        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4607        // so it has exclusive access while polling it.  Therefore, we push a
4608        // work item to the "high priority" queue, which will actually push to
4609        // `ConcurrentState::futures` later.
4610        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4611    }
4612
4613    fn push_high_priority(&mut self, item: WorkItem) {
4614        log::trace!("push high priority: {item:?}");
4615        self.high_priority.push(item);
4616    }
4617
4618    fn push_low_priority(&mut self, item: WorkItem) {
4619        log::trace!("push low priority: {item:?}");
4620        self.low_priority.push(item);
4621    }
4622
4623    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4624        if high_priority {
4625            self.push_high_priority(item);
4626        } else {
4627            self.push_low_priority(item);
4628        }
4629    }
4630
4631    /// Determine whether the instance associated with the specified guest task
4632    /// may be entered (i.e. is not already on the async call stack).
4633    ///
4634    /// This is an additional check on top of the "may_enter" instance flag;
4635    /// it's needed because async-lifted exports with callback functions must
4636    /// not call their own instances directly or indirectly, and due to the
4637    /// "stackless" nature of callback-enabled guest tasks this may happen even
4638    /// if there are no activation records on the stack (i.e. the "may_enter"
4639    /// field is `true`) for that instance.
4640    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4641        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4642
4643        // Walk the task tree back to the root, looking for potential
4644        // reentrance.
4645        //
4646        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4647        // such that each bit represents and instance which has been entered by
4648        // that task or an ancestor of that task, in which case this would be a
4649        // constant time check.
4650        loop {
4651            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4652                Caller::Host { .. } => break true,
4653                Caller::Guest { thread, instance } => {
4654                    if *instance == guest_instance {
4655                        break false;
4656                    } else {
4657                        *thread
4658                    }
4659                }
4660            };
4661            guest_task = next_thread.task;
4662        }
4663    }
4664
4665    /// Record that we're about to enter a (sub-)component instance which does
4666    /// not support more than one concurrent, stackful activation, meaning it
4667    /// cannot be entered again until the next call returns.
4668    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4669        self.instance_state(instance).do_not_enter = true;
4670    }
4671
4672    /// Record that we've exited a (sub-)component instance previously entered
4673    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4674    /// See the documentation for the latter for details.
4675    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4676        self.instance_state(instance).do_not_enter = false;
4677        self.partition_pending(instance)
4678    }
4679
4680    /// Iterate over `InstanceState::pending`, moving any ready items into the
4681    /// "high priority" work item queue.
4682    ///
4683    /// See `GuestCall::is_ready` for details.
4684    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4685        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4686            let call = GuestCall { thread, kind };
4687            if call.is_ready(self)? {
4688                self.push_high_priority(WorkItem::GuestCall(call));
4689            } else {
4690                self.instance_state(instance)
4691                    .pending
4692                    .insert(call.thread, call.kind);
4693            }
4694        }
4695
4696        Ok(())
4697    }
4698
4699    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4700    pub(crate) fn backpressure_modify(
4701        &mut self,
4702        caller_instance: RuntimeComponentInstanceIndex,
4703        modify: impl FnOnce(u16) -> Option<u16>,
4704    ) -> Result<()> {
4705        let state = self.instance_state(caller_instance);
4706        let old = state.backpressure;
4707        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4708        state.backpressure = new;
4709
4710        if old > 0 && new == 0 {
4711            // Backpressure was previously enabled and is now disabled; move any
4712            // newly-eligible guest calls to the "high priority" queue.
4713            self.partition_pending(caller_instance)?;
4714        }
4715
4716        Ok(())
4717    }
4718
4719    /// Implements the `context.get` intrinsic.
4720    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4721        let thread = self.guest_thread.unwrap();
4722        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4723        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4724        Ok(val)
4725    }
4726
4727    /// Implements the `context.set` intrinsic.
4728    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4729        let thread = self.guest_thread.unwrap();
4730        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4731        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4732        Ok(())
4733    }
4734
4735    /// Returns whether there's a pending cancellation on the current guest thread,
4736    /// consuming the event if so.
4737    fn take_pending_cancellation(&mut self) -> bool {
4738        let thread = self.guest_thread.unwrap();
4739        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4740            assert!(matches!(event, Event::Cancelled));
4741            true
4742        } else {
4743            false
4744        }
4745    }
4746}
4747
4748/// Provide a type hint to compiler about the shape of a parameter lower
4749/// closure.
4750fn for_any_lower<
4751    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4752>(
4753    fun: F,
4754) -> F {
4755    fun
4756}
4757
4758/// Provide a type hint to compiler about the shape of a result lift closure.
4759fn for_any_lift<
4760    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4761>(
4762    fun: F,
4763) -> F {
4764    fun
4765}
4766
4767/// Wrap the specified future in a `poll_fn` which asserts that the future is
4768/// only polled from the event loop of the specified `Store`.
4769///
4770/// See `StoreContextMut::run_concurrent` for details.
4771fn checked<F: Future + Send + 'static>(
4772    id: StoreId,
4773    fut: F,
4774) -> impl Future<Output = F::Output> + Send + 'static {
4775    async move {
4776        let mut fut = pin!(fut);
4777        future::poll_fn(move |cx| {
4778            let message = "\
4779                `Future`s which depend on asynchronous component tasks, streams, or \
4780                futures to complete may only be polled from the event loop of the \
4781                store to which they belong.  Please use \
4782                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4783            ";
4784            tls::try_get(|store| {
4785                let matched = match store {
4786                    tls::TryGet::Some(store) => store.id() == id,
4787                    tls::TryGet::Taken | tls::TryGet::None => false,
4788                };
4789
4790                if !matched {
4791                    panic!("{message}")
4792                }
4793            });
4794            fut.as_mut().poll(cx)
4795        })
4796        .await
4797    }
4798}
4799
4800/// Assert that `StoreContextMut::run_concurrent` has not been called from
4801/// within an store's event loop.
4802fn check_recursive_run() {
4803    tls::try_get(|store| {
4804        if !matches!(store, tls::TryGet::None) {
4805            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4806        }
4807    });
4808}
4809
4810fn unpack_callback_code(code: u32) -> (u32, u32) {
4811    (code & 0xF, code >> 4)
4812}
4813
4814/// Helper struct for packaging parameters to be passed to
4815/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4816/// `waitable-set.poll`.
4817struct WaitableCheckParams {
4818    set: TableId<WaitableSet>,
4819    options: OptionsIndex,
4820    payload: u32,
4821}
4822
4823/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4824enum WaitableCheck {
4825    Wait(WaitableCheckParams),
4826    Poll(WaitableCheckParams),
4827}
4828
4829/// Represents a guest task called from the host, prepared using `prepare_call`.
4830pub(crate) struct PreparedCall<R> {
4831    /// The guest export to be called
4832    handle: Func,
4833    /// The guest thread created by `prepare_call`
4834    thread: QualifiedThreadId,
4835    /// The number of lowered core Wasm parameters to pass to the call.
4836    param_count: usize,
4837    /// The `oneshot::Receiver` to which the result of the call will be
4838    /// delivered when it is available.
4839    rx: oneshot::Receiver<LiftedResult>,
4840    /// The `oneshot::Receiver` which will resolve when the task -- and any
4841    /// transitive subtasks -- have all exited.
4842    exit_rx: oneshot::Receiver<()>,
4843    _phantom: PhantomData<R>,
4844}
4845
4846impl<R> PreparedCall<R> {
4847    /// Get a copy of the `TaskId` for this `PreparedCall`.
4848    pub(crate) fn task_id(&self) -> TaskId {
4849        TaskId {
4850            task: self.thread.task,
4851        }
4852    }
4853}
4854
4855/// Represents a task created by `prepare_call`.
4856pub(crate) struct TaskId {
4857    task: TableId<GuestTask>,
4858}
4859
4860impl TaskId {
4861    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
4862    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
4863    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
4864    /// and can be resumed by other tasks for this component, so we mark the future as dropped
4865    /// and delete the task when all threads are done.
4866    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4867        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4868        if !task.already_lowered_parameters() {
4869            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4870        } else {
4871            task.host_future_state = HostFutureState::Dropped;
4872            if task.ready_to_delete() {
4873                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4874            }
4875        }
4876        Ok(())
4877    }
4878}
4879
4880/// Prepare a call to the specified exported Wasm function, providing functions
4881/// for lowering the parameters and lifting the result.
4882///
4883/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4884/// loop, use `queue_call`.
4885pub(crate) fn prepare_call<T, R>(
4886    mut store: StoreContextMut<T>,
4887    handle: Func,
4888    param_count: usize,
4889    host_future_present: bool,
4890    call_post_return_automatically: bool,
4891    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4892    + Send
4893    + Sync
4894    + 'static,
4895    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4896    + Send
4897    + Sync
4898    + 'static,
4899) -> Result<PreparedCall<R>> {
4900    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4901
4902    let instance = handle.instance().id().get(store.0);
4903    let options = &instance.component().env_component().options[options];
4904    let task_return_type = instance.component().types()[ty].results;
4905    let component_instance = raw_options.instance;
4906    let callback = options.callback.map(|i| instance.runtime_callback(i));
4907    let memory = options
4908        .memory()
4909        .map(|i| instance.runtime_memory(i))
4910        .map(SendSyncPtr::new);
4911    let string_encoding = options.string_encoding;
4912    let token = StoreToken::new(store.as_context_mut());
4913    let state = store.0.concurrent_state_mut();
4914
4915    assert!(state.guest_thread.is_none());
4916
4917    let (tx, rx) = oneshot::channel();
4918    let (exit_tx, exit_rx) = oneshot::channel();
4919
4920    let mut task = GuestTask::new(
4921        state,
4922        Box::new(for_any_lower(move |store, params| {
4923            lower_params(handle, token.as_context_mut(store), params)
4924        })),
4925        LiftResult {
4926            lift: Box::new(for_any_lift(move |store, result| {
4927                lift_result(handle, store, result)
4928            })),
4929            ty: task_return_type,
4930            memory,
4931            string_encoding,
4932        },
4933        Caller::Host {
4934            tx: Some(tx),
4935            exit_tx: Arc::new(exit_tx),
4936            host_future_present,
4937            call_post_return_automatically,
4938        },
4939        callback.map(|callback| {
4940            let callback = SendSyncPtr::new(callback);
4941            let instance = handle.instance();
4942            Box::new(
4943                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4944                    let store = token.as_context_mut(store);
4945                    // SAFETY: Per the contract of `prepare_call`, the callback
4946                    // will remain valid at least as long is this task exists.
4947                    unsafe {
4948                        instance.call_callback(
4949                            store,
4950                            runtime_instance,
4951                            callback,
4952                            event,
4953                            handle,
4954                            call_post_return_automatically,
4955                        )
4956                    }
4957                },
4958            ) as CallbackFn
4959        }),
4960        component_instance,
4961    )?;
4962    task.function_index = Some(handle.index());
4963
4964    let task = state.push(task)?;
4965    let thread = state.push(GuestThread::new_implicit(task))?;
4966    state.get_mut(task)?.threads.insert(thread);
4967
4968    Ok(PreparedCall {
4969        handle,
4970        thread: QualifiedThreadId { task, thread },
4971        param_count,
4972        rx,
4973        exit_rx,
4974        _phantom: PhantomData,
4975    })
4976}
4977
4978/// Queue a call previously prepared using `prepare_call` to be run as part of
4979/// the associated `ComponentInstance`'s event loop.
4980///
4981/// The returned future will resolve to the result once it is available, but
4982/// must only be polled via the instance's event loop. See
4983/// `StoreContextMut::run_concurrent` for details.
4984pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4985    mut store: StoreContextMut<T>,
4986    prepared: PreparedCall<R>,
4987) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4988    let PreparedCall {
4989        handle,
4990        thread,
4991        param_count,
4992        rx,
4993        exit_rx,
4994        ..
4995    } = prepared;
4996
4997    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
4998
4999    Ok(checked(
5000        store.0.id(),
5001        rx.map(move |result| {
5002            result
5003                .map(|v| (*v.downcast().unwrap(), exit_rx))
5004                .map_err(anyhow::Error::from)
5005        }),
5006    ))
5007}
5008
5009/// Queue a call previously prepared using `prepare_call` to be run as part of
5010/// the associated `ComponentInstance`'s event loop.
5011fn queue_call0<T: 'static>(
5012    store: StoreContextMut<T>,
5013    handle: Func,
5014    guest_thread: QualifiedThreadId,
5015    param_count: usize,
5016) -> Result<()> {
5017    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5018    let is_concurrent = raw_options.async_;
5019    let callback = raw_options.callback;
5020    let instance = handle.instance();
5021    let callee = handle.lifted_core_func(store.0);
5022    let post_return = handle.post_return_core_func(store.0);
5023    let callback = callback.map(|i| {
5024        let instance = instance.id().get(store.0);
5025        SendSyncPtr::new(instance.runtime_callback(i))
5026    });
5027
5028    log::trace!("queueing call {guest_thread:?}");
5029
5030    let instance_flags = if callback.is_none() {
5031        None
5032    } else {
5033        Some(flags)
5034    };
5035
5036    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5037    // (with signatures appropriate for this call) and will remain valid as
5038    // long as this instance is valid.
5039    unsafe {
5040        instance.queue_call(
5041            store,
5042            guest_thread,
5043            SendSyncPtr::new(callee),
5044            param_count,
5045            1,
5046            instance_flags,
5047            is_concurrent,
5048            callback,
5049            post_return.map(SendSyncPtr::new),
5050        )
5051    }
5052}