wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56    ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61    CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
65use anyhow::{Context as _, Result, anyhow, bail};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, Either, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::slice;
84use std::sync::Arc;
85use std::task::{Context, Poll, Waker};
86use std::vec::Vec;
87use table::{TableDebug, TableId};
88use wasmtime_environ::Trap;
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96
97pub use abort::JoinHandle;
98pub use future_stream_any::{FutureAny, StreamAny};
99pub use futures_and_streams::{
100    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103};
104pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105
106mod abort;
107mod error_contexts;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113/// Constant defined in the Component Model spec to indicate that the async
114/// intrinsic (e.g. `future.write`) has not yet completed.
115const BLOCKED: u32 = 0xffff_ffff;
116
117/// Corresponds to `CallState` in the upstream spec.
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120    Starting = 0,
121    Started = 1,
122    Returned = 2,
123    StartCancelled = 3,
124    ReturnCancelled = 4,
125}
126
127impl Status {
128    /// Packs this status and the optional `waitable` provided into a 32-bit
129    /// result that the canonical ABI requires.
130    ///
131    /// The low 4 bits are reserved for the status while the upper 28 bits are
132    /// the waitable, if present.
133    pub fn pack(self, waitable: Option<u32>) -> u32 {
134        assert!(matches!(self, Status::Returned) == waitable.is_none());
135        let waitable = waitable.unwrap_or(0);
136        assert!(waitable < (1 << 28));
137        (waitable << 4) | (self as u32)
138    }
139}
140
141/// Corresponds to `EventCode` in the Component Model spec, plus related payload
142/// data.
143#[derive(Clone, Copy, Debug)]
144enum Event {
145    None,
146    Cancelled,
147    Subtask {
148        status: Status,
149    },
150    StreamRead {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    StreamWrite {
155        code: ReturnCode,
156        pending: Option<(TypeStreamTableIndex, u32)>,
157    },
158    FutureRead {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162    FutureWrite {
163        code: ReturnCode,
164        pending: Option<(TypeFutureTableIndex, u32)>,
165    },
166}
167
168impl Event {
169    /// Lower this event to core Wasm integers for delivery to the guest.
170    ///
171    /// Note that the waitable handle, if any, is assumed to be lowered
172    /// separately.
173    fn parts(self) -> (u32, u32) {
174        const EVENT_NONE: u32 = 0;
175        const EVENT_SUBTASK: u32 = 1;
176        const EVENT_STREAM_READ: u32 = 2;
177        const EVENT_STREAM_WRITE: u32 = 3;
178        const EVENT_FUTURE_READ: u32 = 4;
179        const EVENT_FUTURE_WRITE: u32 = 5;
180        const EVENT_CANCELLED: u32 = 6;
181        match self {
182            Event::None => (EVENT_NONE, 0),
183            Event::Cancelled => (EVENT_CANCELLED, 0),
184            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189        }
190    }
191}
192
193/// Corresponds to `CallbackCode` in the spec.
194mod callback_code {
195    pub const EXIT: u32 = 0;
196    pub const YIELD: u32 = 1;
197    pub const WAIT: u32 = 2;
198}
199
200/// A flag indicating that the callee is an async-lowered export.
201///
202/// This may be passed to the `async-start` intrinsic from a fused adapter.
203const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205/// Provides access to either store data (via the `get` method) or the store
206/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207/// instance to which the current host task belongs.
208///
209/// See [`Accessor::with`] for details.
210pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211    store: StoreContextMut<'a, T>,
212    get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217    D: HasData + ?Sized,
218    T: 'static,
219{
220    /// Creates a new [`Access`] from its component parts.
221    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222        Self { store, get_data }
223    }
224
225    /// Get mutable access to the store data.
226    pub fn data_mut(&mut self) -> &mut T {
227        self.store.data_mut()
228    }
229
230    /// Get mutable access to the store data.
231    pub fn get(&mut self) -> D::Data<'_> {
232        (self.get_data)(self.data_mut())
233    }
234
235    /// Spawn a background task.
236    ///
237    /// See [`Accessor::spawn`] for details.
238    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239    where
240        T: 'static,
241    {
242        let accessor = Accessor {
243            get_data: self.get_data,
244            token: StoreToken::new(self.store.as_context_mut()),
245        };
246        self.store
247            .as_context_mut()
248            .spawn_with_accessor(accessor, task)
249    }
250
251    /// Returns the getter this accessor is using to project from `T` into
252    /// `D::Data`.
253    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254        self.get_data
255    }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260    D: HasData + ?Sized,
261    T: 'static,
262{
263    type Data = T;
264
265    fn as_context(&self) -> StoreContext<'_, T> {
266        self.store.as_context()
267    }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272    D: HasData + ?Sized,
273    T: 'static,
274{
275    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276        self.store.as_context_mut()
277    }
278}
279
280/// Provides scoped mutable access to store data in the context of a concurrent
281/// host task future.
282///
283/// This allows multiple host task futures to execute concurrently and access
284/// the store between (but not across) `await` points.
285///
286/// # Rationale
287///
288/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289/// `D::Data<'_>`. The problem this is solving, however, is that it does not
290/// literally store these values. The basic problem is that when a concurrent
291/// host future is being polled it has access to `&mut T` (and the whole
292/// `Store`) but when it's not being polled it does not have access to these
293/// values. This reflects how the store is only ever polling one future at a
294/// time so the store is effectively being passed between futures.
295///
296/// Rust's `Future` trait, however, has no means of passing a `Store`
297/// temporarily between futures. The [`Context`](std::task::Context) type does
298/// not have the ability to attach arbitrary information to it at this time.
299/// This type, [`Accessor`], is used to bridge this expressivity gap.
300///
301/// The [`Accessor`] type here represents the ability to acquire, temporarily in
302/// a synchronous manner, the current store. The [`Accessor::with`] function
303/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305/// not take an `async` closure as its argument, instead it's a synchronous
306/// closure which must complete during on run of `Future::poll`. This reflects
307/// how the store is temporarily made available while a host future is being
308/// polled.
309///
310/// # Implementation
311///
312/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313/// this type additionally doesn't even have a lifetime parameter. This is
314/// instead a representation of proof of the ability to acquire these while a
315/// future is being polled. Wasmtime will, when it polls a host future,
316/// configure ambient state such that the `Accessor` that a future closes over
317/// will work and be able to access the store.
318///
319/// This has a number of implications for users such as:
320///
321/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322///   the lifetime of a single future.
323/// * A future is expected to, however, close over an `Accessor` and keep it
324///   alive probably for the duration of the entire future.
325/// * Different host futures will be given different `Accessor`s, and that's
326///   intentional.
327/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328///   alleviates some otherwise required bounds to be written down.
329///
330/// # Using `Accessor` in `Drop`
331///
332/// The methods on `Accessor` are only expected to work in the context of
333/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334/// host future can be dropped at any time throughout the system and Wasmtime
335/// store context is not necessarily available at that time. It's recommended to
336/// not use `Accessor` methods in anything connected to a `Drop` implementation
337/// as they will panic and have unintended results. If you run into this though
338/// feel free to file an issue on the Wasmtime repository.
339pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341    D: HasData + ?Sized,
342{
343    token: StoreToken<T>,
344    get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347/// A helper trait to take any type of accessor-with-data in functions.
348///
349/// This trait is similar to [`AsContextMut`] except that it's used when
350/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351/// [`Accessor`] is the main type used in concurrent settings and is passed to
352/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
353///
354/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355/// this trait. This effectively means that regardless of the `D` in
356/// `Accessor<T, D>` it can still be passed to a function which just needs a
357/// store accessor.
358///
359/// Acquiring an [`Accessor`] can be done through
360/// [`StoreContextMut::run_concurrent`] for example or in a host function
361/// through
362/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
363pub trait AsAccessor {
364    /// The `T` in `Store<T>` that this accessor refers to.
365    type Data: 'static;
366
367    /// The `D` in `Accessor<T, D>`, or the projection out of
368    /// `Self::Data`.
369    type AccessorData: HasData + ?Sized;
370
371    /// Returns the accessor that this is referring to.
372    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376    type Data = T::Data;
377    type AccessorData = T::AccessorData;
378
379    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380        T::as_accessor(self)
381    }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385    type Data = T;
386    type AccessorData = D;
387
388    fn as_accessor(&self) -> &Accessor<T, D> {
389        self
390    }
391}
392
393// Note that it is intentional at this time that `Accessor` does not actually
394// store `&mut T` or anything similar. This distinctly enables the `Accessor`
395// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396// that matter). This is used to ergonomically simplify bindings where the
397// majority of the time `Accessor` is closed over in a future which then needs
398// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399// you already have to write `T: 'static`...) it helps to avoid this.
400//
401// Note as well that `Accessor` doesn't actually store its data at all. Instead
402// it's more of a "proof" of what can be accessed from TLS. API design around
403// `Accessor` and functions like `Linker::func_wrap_concurrent` are
404// intentionally made to ensure that `Accessor` is ideally only used in the
405// context that TLS variables are actually set. For example host functions are
406// given `&Accessor`, not `Accessor`, and this prevents them from persisting
407// the value outside of a future. Within the future the TLS variables are all
408// guaranteed to be set while the future is being polled.
409//
410// Finally though this is not an ironclad guarantee, but nor does it need to be.
411// The TLS APIs are designed to panic or otherwise model usage where they're
412// called recursively or similar. It's hoped that code cannot be constructed to
413// actually hit this at runtime but this is not a safety requirement at this
414// time.
415const _: () = {
416    const fn assert<T: Send + Sync>() {}
417    assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421    /// Creates a new `Accessor` backed by the specified functions.
422    ///
423    /// - `get`: used to retrieve the store
424    ///
425    /// - `get_data`: used to "project" from the store's associated data to
426    /// another type (e.g. a field of that data or a wrapper around it).
427    ///
428    /// - `spawn`: used to queue spawned background tasks to be run later
429    pub(crate) fn new(token: StoreToken<T>) -> Self {
430        Self {
431            token,
432            get_data: |x| x,
433        }
434    }
435}
436
437impl<T, D> Accessor<T, D>
438where
439    D: HasData + ?Sized,
440{
441    /// Run the specified closure, passing it mutable access to the store.
442    ///
443    /// This function is one of the main building blocks of the [`Accessor`]
444    /// type. This yields synchronous, blocking, access to store via an
445    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446    /// providing the ability to access `D` via [`Access::get`]. Note that the
447    /// `fun` here is given only temporary access to the store and `T`/`D`
448    /// meaning that the return value `R` here is not allowed to capture borrows
449    /// into the two. If access is needed to data within `T` or `D` outside of
450    /// this closure then it must be `clone`d out, for example.
451    ///
452    /// # Panics
453    ///
454    /// This function will panic if it is call recursively with any other
455    /// accessor already in scope. For example if `with` is called within `fun`,
456    /// then this function will panic. It is up to the embedder to ensure that
457    /// this does not happen.
458    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459        tls::get(|vmstore| {
460            fun(Access {
461                store: self.token.as_context_mut(vmstore),
462                get_data: self.get_data,
463            })
464        })
465    }
466
467    /// Returns the getter this accessor is using to project from `T` into
468    /// `D::Data`.
469    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470        self.get_data
471    }
472
473    /// Changes this accessor to access `D2` instead of the current type
474    /// parameter `D`.
475    ///
476    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477    ///
478    /// # Panics
479    ///
480    /// When using this API the returned value is disconnected from `&self` and
481    /// the lifetime binding the `self` argument. An `Accessor` only works
482    /// within the context of the closure or async closure that it was
483    /// originally given to, however. This means that due to the fact that the
484    /// returned value has no lifetime connection it's possible to use the
485    /// accessor outside of `&self`, the original accessor, and panic.
486    ///
487    /// The returned value should only be used within the scope of the original
488    /// `Accessor` that `self` refers to.
489    pub fn with_getter<D2: HasData>(
490        &self,
491        get_data: fn(&mut T) -> D2::Data<'_>,
492    ) -> Accessor<T, D2> {
493        Accessor {
494            token: self.token,
495            get_data,
496        }
497    }
498
499    /// Spawn a background task which will receive an `&Accessor<T, D>` and
500    /// run concurrently with any other tasks in progress for the current
501    /// store.
502    ///
503    /// This is particularly useful for host functions which return a `stream`
504    /// or `future` such that the code to write to the write end of that
505    /// `stream` or `future` must run after the function returns.
506    ///
507    /// The returned [`JoinHandle`] may be used to cancel the task.
508    ///
509    /// # Panics
510    ///
511    /// Panics if called within a closure provided to the [`Accessor::with`]
512    /// function. This can only be called outside an active invocation of
513    /// [`Accessor::with`].
514    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515    where
516        T: 'static,
517    {
518        let accessor = self.clone_for_spawn();
519        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520    }
521
522    fn clone_for_spawn(&self) -> Self {
523        Self {
524            token: self.token,
525            get_data: self.get_data,
526        }
527    }
528}
529
530/// Represents a task which may be provided to `Accessor::spawn`,
531/// `Accessor::forward`, or `StorecContextMut::spawn`.
532// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
533// option.
534//
535// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
536// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
537// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
538// Send + Sync + 'static` fails with a type mismatch error when we try to pass
539// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
540// best we can do for the time being.
541pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542where
543    D: HasData + ?Sized,
544{
545    /// Run the task.
546    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547}
548
549/// Represents parameter and result metadata for the caller side of a
550/// guest->guest call orchestrated by a fused adapter.
551enum CallerInfo {
552    /// Metadata for a call to an async-lowered import
553    Async {
554        params: Vec<ValRaw>,
555        has_result: bool,
556    },
557    /// Metadata for a call to an sync-lowered import
558    Sync {
559        params: Vec<ValRaw>,
560        result_count: u32,
561    },
562}
563
564/// Indicates how a guest task is waiting on a waitable set.
565enum WaitMode {
566    /// The guest task is waiting using `task.wait`
567    Fiber(StoreFiber<'static>),
568    /// The guest task is waiting via a callback declared as part of an
569    /// async-lifted export.
570    Callback(Instance),
571}
572
573/// Represents the reason a fiber is suspending itself.
574#[derive(Debug)]
575enum SuspendReason {
576    /// The fiber is waiting for an event to be delivered to the specified
577    /// waitable set or task.
578    Waiting {
579        set: TableId<WaitableSet>,
580        thread: QualifiedThreadId,
581        skip_may_block_check: bool,
582    },
583    /// The fiber has finished handling its most recent work item and is waiting
584    /// for another (or to be dropped if it is no longer needed).
585    NeedWork,
586    /// The fiber is yielding and should be resumed once other tasks have had a
587    /// chance to run.
588    Yielding { thread: QualifiedThreadId },
589    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
590    ExplicitlySuspending {
591        thread: QualifiedThreadId,
592        skip_may_block_check: bool,
593    },
594}
595
596/// Represents a pending call into guest code for a given guest task.
597enum GuestCallKind {
598    /// Indicates there's an event to deliver to the task, possibly related to a
599    /// waitable set the task has been waiting on or polling.
600    DeliverEvent {
601        /// The instance to which the task belongs.
602        instance: Instance,
603        /// The waitable set the event belongs to, if any.
604        ///
605        /// If this is `None` the event will be waiting in the
606        /// `GuestTask::event` field for the task.
607        set: Option<TableId<WaitableSet>>,
608    },
609    /// Indicates that a new guest task call is pending and may be executed
610    /// using the specified closure.
611    ///
612    /// If the closure returns `Ok(Some(call))`, the `call` should be run
613    /// immediately using `handle_guest_call`.
614    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
615    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
616}
617
618impl fmt::Debug for GuestCallKind {
619    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
620        match self {
621            Self::DeliverEvent { instance, set } => f
622                .debug_struct("DeliverEvent")
623                .field("instance", instance)
624                .field("set", set)
625                .finish(),
626            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
627            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
628        }
629    }
630}
631
632/// Represents a pending call into guest code for a given guest thread.
633#[derive(Debug)]
634struct GuestCall {
635    thread: QualifiedThreadId,
636    kind: GuestCallKind,
637}
638
639impl GuestCall {
640    /// Returns whether or not the call is ready to run.
641    ///
642    /// A call will not be ready to run if either:
643    ///
644    /// - the (sub-)component instance to be called has already been entered and
645    /// cannot be reentered until an in-progress call completes
646    ///
647    /// - the call is for a not-yet started task and the (sub-)component
648    /// instance to be called has backpressure enabled
649    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
650        let instance = store
651            .concurrent_state_mut()
652            .get_mut(self.thread.task)?
653            .instance;
654        let state = store.instance_state(instance);
655
656        let ready = match &self.kind {
657            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
658            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
659            GuestCallKind::StartExplicit(_) => true,
660        };
661        log::trace!(
662            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
663            state.do_not_enter,
664            state.backpressure
665        );
666        Ok(ready)
667    }
668}
669
670/// Job to be run on a worker fiber.
671enum WorkerItem {
672    GuestCall(GuestCall),
673    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
674}
675
676/// Represents a pending work item to be handled by the event loop for a given
677/// component instance.
678enum WorkItem {
679    /// A host task to be pushed to `ConcurrentState::futures`.
680    PushFuture(AlwaysMut<HostTaskFuture>),
681    /// A fiber to resume.
682    ResumeFiber(StoreFiber<'static>),
683    /// A pending call into guest code for a given guest task.
684    GuestCall(GuestCall),
685    /// A job to run on a worker fiber.
686    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691        match self {
692            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
696        }
697    }
698}
699
700/// Whether a suspension intrinsic was cancelled or completed
701#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
702pub(crate) enum WaitResult {
703    Cancelled,
704    Completed,
705}
706
707/// Raise a trap if the calling task is synchronous and trying to block prior to
708/// returning a value.
709pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
710    store.concurrent_state_mut().check_blocking()
711}
712
713/// Poll the specified future until it completes on behalf of a guest->host call
714/// using a sync-lowered import.
715///
716/// This is similar to `Instance::first_poll` except it's for sync-lowered
717/// imports, meaning we don't need to handle cancellation and we can block the
718/// caller until the task completes, at which point the caller can handle
719/// lowering the result to the guest's stack and linear memory.
720pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
721    store: &mut dyn VMStore,
722    future: impl Future<Output = Result<R>> + Send + 'static,
723    caller_instance: RuntimeComponentInstanceIndex,
724) -> Result<R> {
725    let state = store.concurrent_state_mut();
726
727    // If there is no current guest thread set, that means the host function was
728    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
729    // should complete immediately.
730    let Some(caller) = state.guest_thread else {
731        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
732            Poll::Ready(result) => result,
733            Poll::Pending => {
734                unreachable!()
735            }
736        };
737    };
738
739    // Save any existing result stashed in `GuestTask::result` so we can replace
740    // it with the new result.
741    let old_result = state
742        .get_mut(caller.task)
743        .with_context(|| format!("bad handle: {caller:?}"))?
744        .result
745        .take();
746
747    // Add a temporary host task into the table so we can track its progress.
748    // Note that we'll never allocate a waitable handle for the guest since
749    // we're being called synchronously.
750    let task = state.push(HostTask::new(caller_instance, None))?;
751
752    log::trace!("new host task child of {caller:?}: {task:?}");
753
754    // Wrap the future in a closure which will take care of stashing the result
755    // in `GuestTask::result` and resuming this fiber when the host task
756    // completes.
757    let mut future = Box::pin(async move {
758        let result = future.await?;
759        tls::get(move |store| {
760            let state = store.concurrent_state_mut();
761            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
762
763            Waitable::Host(task).set_event(
764                state,
765                Some(Event::Subtask {
766                    status: Status::Returned,
767                }),
768            )?;
769
770            Ok(())
771        })
772    }) as HostTaskFuture;
773
774    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
775    // add the future to `ConcurrentState::futures` and poll it automatically
776    // from the event loop if it doesn't complete immediately here.
777    let poll = tls::set(store, || {
778        future
779            .as_mut()
780            .poll(&mut Context::from_waker(&Waker::noop()))
781    });
782
783    match poll {
784        Poll::Ready(result) => {
785            // It completed immediately; check the result and delete the task.
786            result?;
787            log::trace!("delete host task {task:?} (already ready)");
788            store.concurrent_state_mut().delete(task)?;
789        }
790        Poll::Pending => {
791            // It did not complete immediately; add it to
792            // `ConcurrentState::futures` so it will be polled via the event
793            // loop; then use `GuestTask::sync_call_set` to wait for the task to
794            // complete, suspending the current fiber until it does so.
795            let state = store.concurrent_state_mut();
796            state.push_future(future);
797
798            let set = state.get_mut(caller.task)?.sync_call_set;
799            Waitable::Host(task).join(state, Some(set))?;
800
801            store.suspend(SuspendReason::Waiting {
802                set,
803                thread: caller,
804                skip_may_block_check: false,
805            })?;
806        }
807    }
808
809    // Retrieve and return the result.
810    Ok(*mem::replace(
811        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
812        old_result,
813    )
814    .unwrap()
815    .downcast()
816    .unwrap())
817}
818
819/// Execute the specified guest call.
820fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821    let mut next = Some(call);
822    while let Some(call) = next.take() {
823        match call.kind {
824            GuestCallKind::DeliverEvent { instance, set } => {
825                let (event, waitable) = instance
826                    .get_event(store, call.thread.task, set, true)?
827                    .unwrap();
828                let state = store.concurrent_state_mut();
829                let task = state.get_mut(call.thread.task)?;
830                let runtime_instance = task.instance;
831                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833                log::trace!(
834                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835                    call.thread,
836                );
837
838                let old_thread = state.guest_thread.replace(call.thread);
839                log::trace!(
840                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841                    call.thread
842                );
843
844                store.maybe_push_call_context(call.thread.task)?;
845
846                store.enter_instance(runtime_instance);
847
848                let callback = store
849                    .concurrent_state_mut()
850                    .get_mut(call.thread.task)?
851                    .callback
852                    .take()
853                    .unwrap();
854
855                let code = callback(store, runtime_instance.index, event, handle)?;
856
857                store
858                    .concurrent_state_mut()
859                    .get_mut(call.thread.task)?
860                    .callback = Some(callback);
861
862                store.exit_instance(runtime_instance)?;
863
864                store.maybe_pop_call_context(call.thread.task)?;
865
866                next = instance.handle_callback_code(
867                    store,
868                    call.thread,
869                    runtime_instance.index,
870                    code,
871                )?;
872
873                store.concurrent_state_mut().guest_thread = old_thread;
874                log::trace!(
875                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
876                );
877            }
878            GuestCallKind::StartImplicit(fun) => {
879                next = fun(store)?;
880            }
881            GuestCallKind::StartExplicit(fun) => {
882                fun(store)?;
883            }
884        }
885    }
886
887    Ok(())
888}
889
890impl<T> Store<T> {
891    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
892    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
893    where
894        T: Send + 'static,
895    {
896        self.as_context_mut().run_concurrent(fun).await
897    }
898
899    #[doc(hidden)]
900    pub fn assert_concurrent_state_empty(&mut self) {
901        self.as_context_mut().assert_concurrent_state_empty();
902    }
903
904    /// Convenience wrapper for [`StoreContextMut::spawn`].
905    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
906    where
907        T: 'static,
908    {
909        self.as_context_mut().spawn(task)
910    }
911}
912
913impl<T> StoreContextMut<'_, T> {
914    /// Assert that all the relevant tables and queues in the concurrent state
915    /// for this store are empty.
916    ///
917    /// This is for sanity checking in integration tests
918    /// (e.g. `component-async-tests`) that the relevant state has been cleared
919    /// after each test concludes.  This should help us catch leaks, e.g. guest
920    /// tasks which haven't been deleted despite having completed and having
921    /// been dropped by their supertasks.
922    #[doc(hidden)]
923    pub fn assert_concurrent_state_empty(self) {
924        let store = self.0;
925        store
926            .store_data_mut()
927            .components
928            .assert_instance_states_empty();
929        let state = store.concurrent_state_mut();
930        assert!(
931            state.table.get_mut().is_empty(),
932            "non-empty table: {:?}",
933            state.table.get_mut()
934        );
935        assert!(state.high_priority.is_empty());
936        assert!(state.low_priority.is_empty());
937        assert!(state.guest_thread.is_none());
938        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
939        assert!(state.global_error_context_ref_counts.is_empty());
940    }
941
942    /// Spawn a background task to run as part of this instance's event loop.
943    ///
944    /// The task will receive an `&Accessor<U>` and run concurrently with
945    /// any other tasks in progress for the instance.
946    ///
947    /// Note that the task will only make progress if and when the event loop
948    /// for this instance is run.
949    ///
950    /// The returned [`SpawnHandle`] may be used to cancel the task.
951    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
952    where
953        T: 'static,
954    {
955        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
956        self.spawn_with_accessor(accessor, task)
957    }
958
959    /// Internal implementation of `spawn` functions where a `store` is
960    /// available along with an `Accessor`.
961    fn spawn_with_accessor<D>(
962        self,
963        accessor: Accessor<T, D>,
964        task: impl AccessorTask<T, D>,
965    ) -> JoinHandle
966    where
967        T: 'static,
968        D: HasData + ?Sized,
969    {
970        // Create an "abortable future" here where internally the future will
971        // hook calls to poll and possibly spawn more background tasks on each
972        // iteration.
973        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
974        self.0
975            .concurrent_state_mut()
976            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
977        handle
978    }
979
980    /// Run the specified closure `fun` to completion as part of this store's
981    /// event loop.
982    ///
983    /// This will run `fun` as part of this store's event loop until it
984    /// yields a result.  `fun` is provided an [`Accessor`], which provides
985    /// controlled access to the store and its data.
986    ///
987    /// This function can be used to invoke [`Func::call_concurrent`] for
988    /// example within the async closure provided here.
989    ///
990    /// # Store-blocking behavior
991    ///
992    ///
993    ///
994    /// At this time there are certain situations in which the `Future` returned
995    /// by the `AsyncFnOnce` passed to this function will not be polled for an
996    /// extended period of time, despite one or more `Waker::wake` events having
997    /// occurred for the task to which it belongs.  This can manifest as the
998    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
999    /// the `Store` being held by e.g. a blocking host function, preventing the
1000    /// `Future` from being polled. A canonical example of this is when the
1001    /// `fun` provided to this function attempts to set a timeout for an
1002    /// invocation of a wasm function. In this situation the async closure is
1003    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1004    /// to elapse. At this time this setup will not always work and the timeout
1005    /// may not reliably fire.
1006    ///
1007    /// This function will not block the current thread and as such is always
1008    /// suitable to run in an `async` context, but the current implementation of
1009    /// Wasmtime can lead to situations where a certain wasm computation is
1010    /// required to make progress the closure to make progress. This is an
1011    /// artifact of Wasmtime's historical implementation of `async` functions
1012    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1013    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1014    /// progress for a readiness notification of (b) to get delivered.
1015    ///
1016    /// This effectively means that it's not possible to reliably perform a
1017    /// "select" operation within the `fun` closure, which timeouts for example
1018    /// are based on. Fixing this requires some relatively major refactoring
1019    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1020    /// that is intended to be fixed one day. In the meantime it's recommended
1021    /// to apply timeouts or such to the entire `run_concurrent` call itself
1022    /// rather than internally.
1023    ///
1024    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1025    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1026    ///
1027    /// # Example
1028    ///
1029    /// ```
1030    /// # use {
1031    /// #   anyhow::{Result},
1032    /// #   wasmtime::{
1033    /// #     component::{ Component, Linker, Resource, ResourceTable},
1034    /// #     Config, Engine, Store
1035    /// #   },
1036    /// # };
1037    /// #
1038    /// # struct MyResource(u32);
1039    /// # struct Ctx { table: ResourceTable }
1040    /// #
1041    /// # async fn foo() -> Result<()> {
1042    /// # let mut config = Config::new();
1043    /// # let engine = Engine::new(&config)?;
1044    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1045    /// # let mut linker = Linker::new(&engine);
1046    /// # let component = Component::new(&engine, "")?;
1047    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1048    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1049    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1050    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1051    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1052    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1053    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1054    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1055    ///    Ok(())
1056    /// }).await??;
1057    /// # Ok(())
1058    /// # }
1059    /// ```
1060    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1061    where
1062        T: Send + 'static,
1063    {
1064        self.do_run_concurrent(fun, false).await
1065    }
1066
1067    pub(super) async fn run_concurrent_trap_on_idle<R>(
1068        self,
1069        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1070    ) -> Result<R>
1071    where
1072        T: Send + 'static,
1073    {
1074        self.do_run_concurrent(fun, true).await
1075    }
1076
1077    async fn do_run_concurrent<R>(
1078        mut self,
1079        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1080        trap_on_idle: bool,
1081    ) -> Result<R>
1082    where
1083        T: Send + 'static,
1084    {
1085        check_recursive_run();
1086        let token = StoreToken::new(self.as_context_mut());
1087
1088        struct Dropper<'a, T: 'static, V> {
1089            store: StoreContextMut<'a, T>,
1090            value: ManuallyDrop<V>,
1091        }
1092
1093        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1094            fn drop(&mut self) {
1095                tls::set(self.store.0, || {
1096                    // SAFETY: Here we drop the value without moving it for the
1097                    // first and only time -- per the contract for `Drop::drop`,
1098                    // this code won't run again, and the `value` field will no
1099                    // longer be accessible.
1100                    unsafe { ManuallyDrop::drop(&mut self.value) }
1101                });
1102            }
1103        }
1104
1105        let accessor = &Accessor::new(token);
1106        let dropper = &mut Dropper {
1107            store: self,
1108            value: ManuallyDrop::new(fun(accessor)),
1109        };
1110        // SAFETY: We never move `dropper` nor its `value` field.
1111        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1112
1113        dropper
1114            .store
1115            .as_context_mut()
1116            .poll_until(future, trap_on_idle)
1117            .await
1118    }
1119
1120    /// Run this store's event loop.
1121    ///
1122    /// The returned future will resolve when the specified future completes or,
1123    /// if `trap_on_idle` is true, when the event loop can't make further
1124    /// progress.
1125    async fn poll_until<R>(
1126        mut self,
1127        mut future: Pin<&mut impl Future<Output = R>>,
1128        trap_on_idle: bool,
1129    ) -> Result<R>
1130    where
1131        T: Send + 'static,
1132    {
1133        struct Reset<'a, T: 'static> {
1134            store: StoreContextMut<'a, T>,
1135            futures: Option<FuturesUnordered<HostTaskFuture>>,
1136        }
1137
1138        impl<'a, T> Drop for Reset<'a, T> {
1139            fn drop(&mut self) {
1140                if let Some(futures) = self.futures.take() {
1141                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1142                }
1143            }
1144        }
1145
1146        loop {
1147            // Take `ConcurrentState::futures` out of the store so we can poll
1148            // it while also safely giving any of the futures inside access to
1149            // `self`.
1150            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1151            let mut reset = Reset {
1152                store: self.as_context_mut(),
1153                futures,
1154            };
1155            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1156
1157            let result = future::poll_fn(|cx| {
1158                // First, poll the future we were passed as an argument and
1159                // return immediately if it's ready.
1160                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1161                    return Poll::Ready(Ok(Either::Left(value)));
1162                }
1163
1164                // Next, poll `ConcurrentState::futures` (which includes any
1165                // pending host tasks and/or background tasks), returning
1166                // immediately if one of them fails.
1167                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1168                    Poll::Ready(Some(output)) => {
1169                        match output {
1170                            Err(e) => return Poll::Ready(Err(e)),
1171                            Ok(()) => {}
1172                        }
1173                        Poll::Ready(true)
1174                    }
1175                    Poll::Ready(None) => Poll::Ready(false),
1176                    Poll::Pending => Poll::Pending,
1177                };
1178
1179                // Next, check the "high priority" work queue and return
1180                // immediately if it has at least one item.
1181                let state = reset.store.0.concurrent_state_mut();
1182                let ready = mem::take(&mut state.high_priority);
1183                let ready = if ready.is_empty() {
1184                    // Next, check the "low priority" work queue and return
1185                    // immediately if it has at least one item.
1186                    let ready = mem::take(&mut state.low_priority);
1187                    if ready.is_empty() {
1188                        return match next {
1189                            Poll::Ready(true) => {
1190                                // In this case, one of the futures in
1191                                // `ConcurrentState::futures` completed
1192                                // successfully, so we return now and continue
1193                                // the outer loop in case there is another one
1194                                // ready to complete.
1195                                Poll::Ready(Ok(Either::Right(Vec::new())))
1196                            }
1197                            Poll::Ready(false) => {
1198                                // Poll the future we were passed one last time
1199                                // in case one of `ConcurrentState::futures` had
1200                                // the side effect of unblocking it.
1201                                if let Poll::Ready(value) =
1202                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1203                                {
1204                                    Poll::Ready(Ok(Either::Left(value)))
1205                                } else {
1206                                    // In this case, there are no more pending
1207                                    // futures in `ConcurrentState::futures`,
1208                                    // there are no remaining work items, _and_
1209                                    // the future we were passed as an argument
1210                                    // still hasn't completed.
1211                                    if trap_on_idle {
1212                                        // `trap_on_idle` is true, so we exit
1213                                        // immediately.
1214                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1215                                    } else {
1216                                        // `trap_on_idle` is false, so we assume
1217                                        // that future will wake up and give us
1218                                        // more work to do when it's ready to.
1219                                        Poll::Pending
1220                                    }
1221                                }
1222                            }
1223                            // There is at least one pending future in
1224                            // `ConcurrentState::futures` and we have nothing
1225                            // else to do but wait for now, so we return
1226                            // `Pending`.
1227                            Poll::Pending => Poll::Pending,
1228                        };
1229                    } else {
1230                        ready
1231                    }
1232                } else {
1233                    ready
1234                };
1235
1236                Poll::Ready(Ok(Either::Right(ready)))
1237            })
1238            .await;
1239
1240            // Put the `ConcurrentState::futures` back into the store before we
1241            // return or handle any work items since one or more of those items
1242            // might append more futures.
1243            drop(reset);
1244
1245            match result? {
1246                // The future we were passed as an argument completed, so we
1247                // return the result.
1248                Either::Left(value) => break Ok(value),
1249                // The future we were passed has not yet completed, so handle
1250                // any work items and then loop again.
1251                Either::Right(ready) => {
1252                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1253                        store: StoreContextMut<'a, T>,
1254                        ready: I,
1255                    }
1256
1257                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1258                        fn drop(&mut self) {
1259                            while let Some(item) = self.ready.next() {
1260                                match item {
1261                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1262                                    WorkItem::PushFuture(future) => {
1263                                        tls::set(self.store.0, move || drop(future))
1264                                    }
1265                                    _ => {}
1266                                }
1267                            }
1268                        }
1269                    }
1270
1271                    let mut dispose = Dispose {
1272                        store: self.as_context_mut(),
1273                        ready: ready.into_iter(),
1274                    };
1275
1276                    while let Some(item) = dispose.ready.next() {
1277                        dispose
1278                            .store
1279                            .as_context_mut()
1280                            .handle_work_item(item)
1281                            .await?;
1282                    }
1283                }
1284            }
1285        }
1286    }
1287
1288    /// Handle the specified work item, possibly resuming a fiber if applicable.
1289    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1290    where
1291        T: Send,
1292    {
1293        log::trace!("handle work item {item:?}");
1294        match item {
1295            WorkItem::PushFuture(future) => {
1296                self.0
1297                    .concurrent_state_mut()
1298                    .futures
1299                    .get_mut()
1300                    .as_mut()
1301                    .unwrap()
1302                    .push(future.into_inner());
1303            }
1304            WorkItem::ResumeFiber(fiber) => {
1305                self.0.resume_fiber(fiber).await?;
1306            }
1307            WorkItem::GuestCall(call) => {
1308                if call.is_ready(self.0)? {
1309                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1310                } else {
1311                    let state = self.0.concurrent_state_mut();
1312                    let task = state.get_mut(call.thread.task)?;
1313                    if !task.starting_sent {
1314                        task.starting_sent = true;
1315                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1316                            Waitable::Guest(call.thread.task).set_event(
1317                                state,
1318                                Some(Event::Subtask {
1319                                    status: Status::Starting,
1320                                }),
1321                            )?;
1322                        }
1323                    }
1324
1325                    let instance = state.get_mut(call.thread.task)?.instance;
1326                    self.0
1327                        .instance_state(instance)
1328                        .pending
1329                        .insert(call.thread, call.kind);
1330                }
1331            }
1332            WorkItem::WorkerFunction(fun) => {
1333                self.run_on_worker(WorkerItem::Function(fun)).await?;
1334            }
1335        }
1336
1337        Ok(())
1338    }
1339
1340    /// Execute the specified guest call on a worker fiber.
1341    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1342    where
1343        T: Send,
1344    {
1345        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1346            fiber
1347        } else {
1348            fiber::make_fiber(self.0, move |store| {
1349                loop {
1350                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1351                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1352                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1353                    }
1354
1355                    store.suspend(SuspendReason::NeedWork)?;
1356                }
1357            })?
1358        };
1359
1360        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1361        assert!(worker_item.is_none());
1362        *worker_item = Some(item);
1363
1364        self.0.resume_fiber(worker).await
1365    }
1366
1367    /// Wrap the specified host function in a future which will call it, passing
1368    /// it an `&Accessor<T>`.
1369    ///
1370    /// See the `Accessor` documentation for details.
1371    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1372    where
1373        T: 'static,
1374        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1375            + Send
1376            + Sync
1377            + 'static,
1378        R: Send + Sync + 'static,
1379    {
1380        let token = StoreToken::new(self);
1381        async move {
1382            let mut accessor = Accessor::new(token);
1383            closure(&mut accessor).await
1384        }
1385    }
1386}
1387
1388#[derive(Debug, Copy, Clone)]
1389pub struct RuntimeInstance {
1390    pub instance: ComponentInstanceId,
1391    pub index: RuntimeComponentInstanceIndex,
1392}
1393
1394impl StoreOpaque {
1395    /// Helper function to retrieve the `ConcurrentInstanceState` for the
1396    /// specified instance.
1397    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1398        StoreComponentInstanceId::new(self.id(), instance.instance)
1399            .get_mut(self)
1400            .instance_state(instance.index)
1401            .unwrap()
1402            .concurrent_state()
1403    }
1404
1405    /// Helper function to retrieve the `HandleTable` for the specified
1406    /// instance.
1407    fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1408        StoreComponentInstanceId::new(self.id(), instance.instance)
1409            .get_mut(self)
1410            .instance_state(instance.index)
1411            .unwrap()
1412            .handle_table()
1413    }
1414
1415    /// Record that we're about to enter a (sub-)component instance which does
1416    /// not support more than one concurrent, stackful activation, meaning it
1417    /// cannot be entered again until the next call returns.
1418    fn enter_instance(&mut self, instance: RuntimeInstance) {
1419        log::trace!("enter {instance:?}");
1420        self.instance_state(instance).do_not_enter = true;
1421    }
1422
1423    /// Record that we've exited a (sub-)component instance previously entered
1424    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1425    /// See the documentation for the latter for details.
1426    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1427        log::trace!("exit {instance:?}");
1428        self.instance_state(instance).do_not_enter = false;
1429        self.partition_pending(instance)
1430    }
1431
1432    /// Iterate over `InstanceState::pending`, moving any ready items into the
1433    /// "high priority" work item queue.
1434    ///
1435    /// See `GuestCall::is_ready` for details.
1436    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1437        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1438            let call = GuestCall { thread, kind };
1439            if call.is_ready(self)? {
1440                self.concurrent_state_mut()
1441                    .push_high_priority(WorkItem::GuestCall(call));
1442            } else {
1443                self.instance_state(instance)
1444                    .pending
1445                    .insert(call.thread, call.kind);
1446            }
1447        }
1448
1449        Ok(())
1450    }
1451
1452    /// Implements the `backpressure.{inc,dec}` intrinsics.
1453    pub(crate) fn backpressure_modify(
1454        &mut self,
1455        caller_instance: RuntimeInstance,
1456        modify: impl FnOnce(u16) -> Option<u16>,
1457    ) -> Result<()> {
1458        let state = self.instance_state(caller_instance);
1459        let old = state.backpressure;
1460        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
1461        state.backpressure = new;
1462
1463        if old > 0 && new == 0 {
1464            // Backpressure was previously enabled and is now disabled; move any
1465            // newly-eligible guest calls to the "high priority" queue.
1466            self.partition_pending(caller_instance)?;
1467        }
1468
1469        Ok(())
1470    }
1471
1472    /// Resume the specified fiber, giving it exclusive access to the specified
1473    /// store.
1474    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1475        let old_thread = self.concurrent_state_mut().guest_thread;
1476        log::trace!("resume_fiber: save current thread {old_thread:?}");
1477
1478        let fiber = fiber::resolve_or_release(self, fiber).await?;
1479
1480        let state = self.concurrent_state_mut();
1481
1482        state.guest_thread = old_thread;
1483        if let Some(ref ot) = old_thread {
1484            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1485        }
1486        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1487
1488        if let Some(mut fiber) = fiber {
1489            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1490            // See the `SuspendReason` documentation for what each case means.
1491            match state.suspend_reason.take().unwrap() {
1492                SuspendReason::NeedWork => {
1493                    if state.worker.is_none() {
1494                        state.worker = Some(fiber);
1495                    } else {
1496                        fiber.dispose(self);
1497                    }
1498                }
1499                SuspendReason::Yielding { thread, .. } => {
1500                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1501                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1502                }
1503                SuspendReason::ExplicitlySuspending { thread, .. } => {
1504                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1505                }
1506                SuspendReason::Waiting { set, thread, .. } => {
1507                    let old = state
1508                        .get_mut(set)?
1509                        .waiting
1510                        .insert(thread, WaitMode::Fiber(fiber));
1511                    assert!(old.is_none());
1512                }
1513            };
1514        } else {
1515            log::trace!("resume_fiber: fiber has exited");
1516        }
1517
1518        Ok(())
1519    }
1520
1521    /// Suspend the current fiber, storing the reason in
1522    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1523    /// it should be resumed.
1524    ///
1525    /// See the `SuspendReason` documentation for details.
1526    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1527        log::trace!("suspend fiber: {reason:?}");
1528
1529        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1530        // pop the call context which manages resource borrows before suspending
1531        // and then push it again once we've resumed.
1532        let task = match &reason {
1533            SuspendReason::Yielding { thread, .. }
1534            | SuspendReason::Waiting { thread, .. }
1535            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1536            SuspendReason::NeedWork => None,
1537        };
1538
1539        let old_guest_thread = if let Some(task) = task {
1540            self.maybe_pop_call_context(task)?;
1541            self.concurrent_state_mut().guest_thread
1542        } else {
1543            None
1544        };
1545
1546        // We should not have reached here unless either there's no current
1547        // task, or the current task is permitted to block.  In addition, we
1548        // special-case `thread.switch-to` and waiting for a subtask to go from
1549        // `starting` to `started`, both of which we consider non-blocking
1550        // operations despite requiring a suspend.
1551        assert!(
1552            matches!(
1553                reason,
1554                SuspendReason::ExplicitlySuspending {
1555                    skip_may_block_check: true,
1556                    ..
1557                } | SuspendReason::Waiting {
1558                    skip_may_block_check: true,
1559                    ..
1560                }
1561            ) || old_guest_thread
1562                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1563                .unwrap_or(true)
1564        );
1565
1566        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1567        assert!(suspend_reason.is_none());
1568        *suspend_reason = Some(reason);
1569
1570        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1571
1572        if let Some(task) = task {
1573            self.concurrent_state_mut().guest_thread = old_guest_thread;
1574            self.maybe_push_call_context(task)?;
1575        }
1576
1577        Ok(())
1578    }
1579
1580    /// Push the call context for managing resource borrows for the specified
1581    /// guest task if it has not yet either returned a result or cancelled
1582    /// itself.
1583    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1584        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1585
1586        if !task.returned_or_cancelled() {
1587            log::trace!("push call context for {guest_task:?}");
1588            let call_context = task.call_context.take().unwrap();
1589            self.component_resource_state().0.push(call_context);
1590        }
1591        Ok(())
1592    }
1593
1594    /// Pop the call context for managing resource borrows for the specified
1595    /// guest task if it has not yet either returned a result or cancelled
1596    /// itself.
1597    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1598        if !self
1599            .concurrent_state_mut()
1600            .get_mut(guest_task)?
1601            .returned_or_cancelled()
1602        {
1603            log::trace!("pop call context for {guest_task:?}");
1604            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1605            self.concurrent_state_mut()
1606                .get_mut(guest_task)?
1607                .call_context = call_context;
1608        }
1609        Ok(())
1610    }
1611
1612    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1613        let state = self.concurrent_state_mut();
1614        let caller = state.guest_thread.unwrap();
1615        let old_set = waitable.common(state)?.set;
1616        let set = state.get_mut(caller.task)?.sync_call_set;
1617        waitable.join(state, Some(set))?;
1618        self.suspend(SuspendReason::Waiting {
1619            set,
1620            thread: caller,
1621            skip_may_block_check: false,
1622        })?;
1623        let state = self.concurrent_state_mut();
1624        waitable.join(state, old_set)
1625    }
1626}
1627
1628impl Instance {
1629    /// Get the next pending event for the specified task and (optional)
1630    /// waitable set, along with the waitable handle if applicable.
1631    fn get_event(
1632        self,
1633        store: &mut StoreOpaque,
1634        guest_task: TableId<GuestTask>,
1635        set: Option<TableId<WaitableSet>>,
1636        cancellable: bool,
1637    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1638        let state = store.concurrent_state_mut();
1639
1640        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1641            log::trace!("deliver event {event:?} to {guest_task:?}");
1642
1643            if cancellable || !matches!(event, Event::Cancelled) {
1644                return Ok(Some((event, None)));
1645            } else {
1646                state.get_mut(guest_task)?.event = Some(event);
1647            }
1648        }
1649
1650        Ok(
1651            if let Some((set, waitable)) = set
1652                .and_then(|set| {
1653                    state
1654                        .get_mut(set)
1655                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1656                        .transpose()
1657                })
1658                .transpose()?
1659            {
1660                let common = waitable.common(state)?;
1661                let handle = common.handle.unwrap();
1662                let event = common.event.take().unwrap();
1663
1664                log::trace!(
1665                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1666                );
1667
1668                waitable.on_delivery(store, self, event);
1669
1670                Some((event, Some((waitable, handle))))
1671            } else {
1672                None
1673            },
1674        )
1675    }
1676
1677    /// Handle the `CallbackCode` returned from an async-lifted export or its
1678    /// callback.
1679    ///
1680    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1681    /// using `handle_guest_call`.
1682    fn handle_callback_code(
1683        self,
1684        store: &mut StoreOpaque,
1685        guest_thread: QualifiedThreadId,
1686        runtime_instance: RuntimeComponentInstanceIndex,
1687        code: u32,
1688    ) -> Result<Option<GuestCall>> {
1689        let (code, set) = unpack_callback_code(code);
1690
1691        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1692
1693        let state = store.concurrent_state_mut();
1694
1695        let get_set = |store: &mut StoreOpaque, handle| {
1696            if handle == 0 {
1697                bail!("invalid waitable-set handle");
1698            }
1699
1700            let set = store
1701                .handle_table(RuntimeInstance {
1702                    instance: self.id().instance(),
1703                    index: runtime_instance,
1704                })
1705                .waitable_set_rep(handle)?;
1706
1707            Ok(TableId::<WaitableSet>::new(set))
1708        };
1709
1710        Ok(match code {
1711            callback_code::EXIT => {
1712                log::trace!("implicit thread {guest_thread:?} completed");
1713                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1714                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715                if task.threads.is_empty() && !task.returned_or_cancelled() {
1716                    bail!(Trap::NoAsyncResult);
1717                }
1718                match &task.caller {
1719                    Caller::Host { .. } => {
1720                        if task.ready_to_delete() {
1721                            Waitable::Guest(guest_thread.task)
1722                                .delete_from(store.concurrent_state_mut())?;
1723                        }
1724                    }
1725                    Caller::Guest { .. } => {
1726                        task.exited = true;
1727                        task.callback = None;
1728                    }
1729                }
1730                None
1731            }
1732            callback_code::YIELD => {
1733                let task = state.get_mut(guest_thread.task)?;
1734                // If an `Event::Cancelled` is pending, we'll deliver that;
1735                // otherwise, we'll deliver `Event::None`.  Note that
1736                // `GuestTask::event` is only ever set to one of those two
1737                // `Event` variants.
1738                if let Some(event) = task.event {
1739                    assert!(matches!(event, Event::None | Event::Cancelled));
1740                } else {
1741                    task.event = Some(Event::None);
1742                }
1743                let call = GuestCall {
1744                    thread: guest_thread,
1745                    kind: GuestCallKind::DeliverEvent {
1746                        instance: self,
1747                        set: None,
1748                    },
1749                };
1750                if state.may_block(guest_thread.task) {
1751                    // Push this thread onto the "low priority" queue so it runs
1752                    // after any other threads have had a chance to run.
1753                    state.push_low_priority(WorkItem::GuestCall(call));
1754                    None
1755                } else {
1756                    // Yielding in a non-blocking context is defined as a no-op
1757                    // according to the spec, so we must run this thread
1758                    // immediately without allowing any others to run.
1759                    Some(call)
1760                }
1761            }
1762            callback_code::WAIT => {
1763                // The task may only return `WAIT` if it was created for a call
1764                // to an async export).  Otherwise, we'll trap.
1765                state.check_blocking_for(guest_thread.task)?;
1766
1767                let set = get_set(store, set)?;
1768                let state = store.concurrent_state_mut();
1769
1770                if state.get_mut(guest_thread.task)?.event.is_some()
1771                    || !state.get_mut(set)?.ready.is_empty()
1772                {
1773                    // An event is immediately available; deliver it ASAP.
1774                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1775                        thread: guest_thread,
1776                        kind: GuestCallKind::DeliverEvent {
1777                            instance: self,
1778                            set: Some(set),
1779                        },
1780                    }));
1781                } else {
1782                    // No event is immediately available.
1783                    //
1784                    // We're waiting, so register to be woken up when an event
1785                    // is published for this waitable set.
1786                    //
1787                    // Here we also set `GuestTask::wake_on_cancel` which allows
1788                    // `subtask.cancel` to interrupt the wait.
1789                    let old = state
1790                        .get_mut(guest_thread.thread)?
1791                        .wake_on_cancel
1792                        .replace(set);
1793                    assert!(old.is_none());
1794                    let old = state
1795                        .get_mut(set)?
1796                        .waiting
1797                        .insert(guest_thread, WaitMode::Callback(self));
1798                    assert!(old.is_none());
1799                }
1800                None
1801            }
1802            _ => bail!("unsupported callback code: {code}"),
1803        })
1804    }
1805
1806    fn cleanup_thread(
1807        self,
1808        store: &mut StoreOpaque,
1809        guest_thread: QualifiedThreadId,
1810        runtime_instance: RuntimeComponentInstanceIndex,
1811    ) -> Result<()> {
1812        let guest_id = store
1813            .concurrent_state_mut()
1814            .get_mut(guest_thread.thread)?
1815            .instance_rep;
1816        store
1817            .handle_table(RuntimeInstance {
1818                instance: self.id().instance(),
1819                index: runtime_instance,
1820            })
1821            .guest_thread_remove(guest_id.unwrap())?;
1822
1823        store.concurrent_state_mut().delete(guest_thread.thread)?;
1824        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1825        task.threads.remove(&guest_thread.thread);
1826        Ok(())
1827    }
1828
1829    /// Add the specified guest call to the "high priority" work item queue, to
1830    /// be started as soon as backpressure and/or reentrance rules allow.
1831    ///
1832    /// SAFETY: The raw pointer arguments must be valid references to guest
1833    /// functions (with the appropriate signatures) when the closures queued by
1834    /// this function are called.
1835    unsafe fn queue_call<T: 'static>(
1836        self,
1837        mut store: StoreContextMut<T>,
1838        guest_thread: QualifiedThreadId,
1839        callee: SendSyncPtr<VMFuncRef>,
1840        param_count: usize,
1841        result_count: usize,
1842        flags: Option<InstanceFlags>,
1843        async_: bool,
1844        callback: Option<SendSyncPtr<VMFuncRef>>,
1845        post_return: Option<SendSyncPtr<VMFuncRef>>,
1846    ) -> Result<()> {
1847        /// Return a closure which will call the specified function in the scope
1848        /// of the specified task.
1849        ///
1850        /// This will use `GuestTask::lower_params` to lower the parameters, but
1851        /// will not lift the result; instead, it returns a
1852        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1853        /// any, may be lifted.  Note that an async-lifted export will have
1854        /// returned its result using the `task.return` intrinsic (or not
1855        /// returned a result at all, in the case of `task.cancel`), in which
1856        /// case the "result" of this call will either be a callback code or
1857        /// nothing.
1858        ///
1859        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1860        /// the returned closure is called.
1861        unsafe fn make_call<T: 'static>(
1862            store: StoreContextMut<T>,
1863            guest_thread: QualifiedThreadId,
1864            callee: SendSyncPtr<VMFuncRef>,
1865            param_count: usize,
1866            result_count: usize,
1867            flags: Option<InstanceFlags>,
1868        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1869        + Send
1870        + Sync
1871        + 'static
1872        + use<T> {
1873            let token = StoreToken::new(store);
1874            move |store: &mut dyn VMStore| {
1875                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1876
1877                store
1878                    .concurrent_state_mut()
1879                    .get_mut(guest_thread.thread)?
1880                    .state = GuestThreadState::Running;
1881                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1882                let may_enter_after_call = task.call_post_return_automatically();
1883                let lower = task.lower_params.take().unwrap();
1884
1885                lower(store, &mut storage[..param_count])?;
1886
1887                let mut store = token.as_context_mut(store);
1888
1889                // SAFETY: Per the contract documented in `make_call's`
1890                // documentation, `callee` must be a valid pointer.
1891                unsafe {
1892                    if let Some(mut flags) = flags {
1893                        flags.set_may_enter(false);
1894                    }
1895                    crate::Func::call_unchecked_raw(
1896                        &mut store,
1897                        callee.as_non_null(),
1898                        NonNull::new(
1899                            &mut storage[..param_count.max(result_count)]
1900                                as *mut [MaybeUninit<ValRaw>] as _,
1901                        )
1902                        .unwrap(),
1903                    )?;
1904                    if let Some(mut flags) = flags {
1905                        flags.set_may_enter(may_enter_after_call);
1906                    }
1907                }
1908
1909                Ok(storage)
1910            }
1911        }
1912
1913        // SAFETY: Per the contract described in this function documentation,
1914        // the `callee` pointer which `call` closes over must be valid when
1915        // called by the closure we queue below.
1916        let call = unsafe {
1917            make_call(
1918                store.as_context_mut(),
1919                guest_thread,
1920                callee,
1921                param_count,
1922                result_count,
1923                flags,
1924            )
1925        };
1926
1927        let callee_instance = store
1928            .0
1929            .concurrent_state_mut()
1930            .get_mut(guest_thread.task)?
1931            .instance;
1932
1933        let fun = if callback.is_some() {
1934            assert!(async_);
1935
1936            Box::new(move |store: &mut dyn VMStore| {
1937                self.add_guest_thread_to_instance_table(
1938                    guest_thread.thread,
1939                    store,
1940                    callee_instance.index,
1941                )?;
1942                let old_thread = store
1943                    .concurrent_state_mut()
1944                    .guest_thread
1945                    .replace(guest_thread);
1946                log::trace!(
1947                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1948                );
1949
1950                store.maybe_push_call_context(guest_thread.task)?;
1951
1952                store.enter_instance(callee_instance);
1953
1954                // SAFETY: See the documentation for `make_call` to review the
1955                // contract we must uphold for `call` here.
1956                //
1957                // Per the contract described in the `queue_call`
1958                // documentation, the `callee` pointer which `call` closes
1959                // over must be valid.
1960                let storage = call(store)?;
1961
1962                store.exit_instance(callee_instance)?;
1963
1964                store.maybe_pop_call_context(guest_thread.task)?;
1965
1966                let state = store.concurrent_state_mut();
1967                state.guest_thread = old_thread;
1968                old_thread
1969                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1970                log::trace!("stackless call: restored {old_thread:?} as current thread");
1971
1972                // SAFETY: `wasmparser` will have validated that the callback
1973                // function returns a `i32` result.
1974                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1975
1976                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
1977            })
1978                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
1979        } else {
1980            let token = StoreToken::new(store.as_context_mut());
1981            Box::new(move |store: &mut dyn VMStore| {
1982                self.add_guest_thread_to_instance_table(
1983                    guest_thread.thread,
1984                    store,
1985                    callee_instance.index,
1986                )?;
1987                let old_thread = store
1988                    .concurrent_state_mut()
1989                    .guest_thread
1990                    .replace(guest_thread);
1991                log::trace!(
1992                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1993                );
1994                let mut flags = self.id().get(store).instance_flags(callee_instance.index);
1995
1996                store.maybe_push_call_context(guest_thread.task)?;
1997
1998                // Unless this is a callback-less (i.e. stackful)
1999                // async-lifted export, we need to record that the instance
2000                // cannot be entered until the call returns.
2001                if !async_ {
2002                    store.enter_instance(callee_instance);
2003                }
2004
2005                // SAFETY: See the documentation for `make_call` to review the
2006                // contract we must uphold for `call` here.
2007                //
2008                // Per the contract described in the `queue_call`
2009                // documentation, the `callee` pointer which `call` closes
2010                // over must be valid.
2011                let storage = call(store)?;
2012
2013                // This is a callback-less call, so the implicit thread has now completed
2014                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2015
2016                if async_ {
2017                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2018                    if task.threads.is_empty() && !task.returned_or_cancelled() {
2019                        bail!(Trap::NoAsyncResult);
2020                    }
2021                } else {
2022                    // This is a sync-lifted export, so now is when we lift the
2023                    // result, optionally call the post-return function, if any,
2024                    // and finally notify any current or future waiters that the
2025                    // subtask has returned.
2026
2027                    let lift = {
2028                        store.exit_instance(callee_instance)?;
2029
2030                        let state = store.concurrent_state_mut();
2031                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2032
2033                        state
2034                            .get_mut(guest_thread.task)?
2035                            .lift_result
2036                            .take()
2037                            .unwrap()
2038                    };
2039
2040                    // SAFETY: `result_count` represents the number of core Wasm
2041                    // results returned, per `wasmparser`.
2042                    let result = (lift.lift)(store, unsafe {
2043                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2044                            &storage[..result_count],
2045                        )
2046                    })?;
2047
2048                    let post_return_arg = match result_count {
2049                        0 => ValRaw::i32(0),
2050                        // SAFETY: `result_count` represents the number of
2051                        // core Wasm results returned, per `wasmparser`.
2052                        1 => unsafe { storage[0].assume_init() },
2053                        _ => unreachable!(),
2054                    };
2055
2056                    if store
2057                        .concurrent_state_mut()
2058                        .get_mut(guest_thread.task)?
2059                        .call_post_return_automatically()
2060                    {
2061                        unsafe {
2062                            flags.set_may_leave(false);
2063                            flags.set_needs_post_return(false);
2064                        }
2065
2066                        if let Some(func) = post_return {
2067                            let mut store = token.as_context_mut(store);
2068
2069                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
2070                            // either `wasmtime-cranelift`-generated fused adapter
2071                            // code or `component::Options`.  Per `wasmparser`
2072                            // post-return signature validation, we know it takes a
2073                            // single parameter.
2074                            unsafe {
2075                                crate::Func::call_unchecked_raw(
2076                                    &mut store,
2077                                    func.as_non_null(),
2078                                    slice::from_ref(&post_return_arg).into(),
2079                                )?;
2080                            }
2081                        }
2082
2083                        unsafe {
2084                            flags.set_may_leave(true);
2085                            flags.set_may_enter(true);
2086                        }
2087                    }
2088
2089                    self.task_complete(
2090                        store,
2091                        guest_thread.task,
2092                        result,
2093                        Status::Returned,
2094                        post_return_arg,
2095                    )?;
2096                }
2097
2098                store.maybe_pop_call_context(guest_thread.task)?;
2099
2100                let state = store.concurrent_state_mut();
2101                let task = state.get_mut(guest_thread.task)?;
2102
2103                match &task.caller {
2104                    Caller::Host { .. } => {
2105                        if task.ready_to_delete() {
2106                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2107                        }
2108                    }
2109                    Caller::Guest { .. } => {
2110                        task.exited = true;
2111                    }
2112                }
2113
2114                Ok(None)
2115            })
2116        };
2117
2118        store
2119            .0
2120            .concurrent_state_mut()
2121            .push_high_priority(WorkItem::GuestCall(GuestCall {
2122                thread: guest_thread,
2123                kind: GuestCallKind::StartImplicit(fun),
2124            }));
2125
2126        Ok(())
2127    }
2128
2129    /// Prepare (but do not start) a guest->guest call.
2130    ///
2131    /// This is called from fused adapter code generated in
2132    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2133    /// are synthesized Wasm functions which move the parameters from the caller
2134    /// to the callee and the result from the callee to the caller,
2135    /// respectively.  The adapter will call `Self::start_call` immediately
2136    /// after calling this function.
2137    ///
2138    /// SAFETY: All the pointer arguments must be valid pointers to guest
2139    /// entities (and with the expected signatures for the function references
2140    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2141    unsafe fn prepare_call<T: 'static>(
2142        self,
2143        mut store: StoreContextMut<T>,
2144        start: *mut VMFuncRef,
2145        return_: *mut VMFuncRef,
2146        caller_instance: RuntimeComponentInstanceIndex,
2147        callee_instance: RuntimeComponentInstanceIndex,
2148        task_return_type: TypeTupleIndex,
2149        callee_async: bool,
2150        memory: *mut VMMemoryDefinition,
2151        string_encoding: u8,
2152        caller_info: CallerInfo,
2153    ) -> Result<()> {
2154        self.id().get(store.0).check_may_leave(caller_instance)?;
2155
2156        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2157            // A task may only call an async-typed function via a sync lower if
2158            // it was created by a call to an async export.  Otherwise, we'll
2159            // trap.
2160            store.0.concurrent_state_mut().check_blocking()?;
2161        }
2162
2163        enum ResultInfo {
2164            Heap { results: u32 },
2165            Stack { result_count: u32 },
2166        }
2167
2168        let result_info = match &caller_info {
2169            CallerInfo::Async {
2170                has_result: true,
2171                params,
2172            } => ResultInfo::Heap {
2173                results: params.last().unwrap().get_u32(),
2174            },
2175            CallerInfo::Async {
2176                has_result: false, ..
2177            } => ResultInfo::Stack { result_count: 0 },
2178            CallerInfo::Sync {
2179                result_count,
2180                params,
2181            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2182                results: params.last().unwrap().get_u32(),
2183            },
2184            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2185                result_count: *result_count,
2186            },
2187        };
2188
2189        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2190
2191        // Create a new guest task for the call, closing over the `start` and
2192        // `return_` functions to lift the parameters and lower the result,
2193        // respectively.
2194        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2195        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2196        let token = StoreToken::new(store.as_context_mut());
2197        let state = store.0.concurrent_state_mut();
2198        let old_thread = state.guest_thread.take();
2199        let new_task = GuestTask::new(
2200            state,
2201            Box::new(move |store, dst| {
2202                let mut store = token.as_context_mut(store);
2203                assert!(dst.len() <= MAX_FLAT_PARAMS);
2204                // The `+ 1` here accounts for the return pointer, if any:
2205                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2206                let count = match caller_info {
2207                    // Async callers, if they have a result, use the last
2208                    // parameter as a return pointer so chop that off if
2209                    // relevant here.
2210                    CallerInfo::Async { params, has_result } => {
2211                        let params = &params[..params.len() - usize::from(has_result)];
2212                        for (param, src) in params.iter().zip(&mut src) {
2213                            src.write(*param);
2214                        }
2215                        params.len()
2216                    }
2217
2218                    // Sync callers forward everything directly.
2219                    CallerInfo::Sync { params, .. } => {
2220                        for (param, src) in params.iter().zip(&mut src) {
2221                            src.write(*param);
2222                        }
2223                        params.len()
2224                    }
2225                };
2226                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2227                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2228                // how it was constructed (see
2229                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2230                // for details) we know it takes count parameters and returns
2231                // `dst.len()` results.
2232                unsafe {
2233                    crate::Func::call_unchecked_raw(
2234                        &mut store,
2235                        start.as_non_null(),
2236                        NonNull::new(
2237                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2238                        )
2239                        .unwrap(),
2240                    )?;
2241                }
2242                dst.copy_from_slice(&src[..dst.len()]);
2243                let state = store.0.concurrent_state_mut();
2244                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2245                    state,
2246                    Some(Event::Subtask {
2247                        status: Status::Started,
2248                    }),
2249                )?;
2250                Ok(())
2251            }),
2252            LiftResult {
2253                lift: Box::new(move |store, src| {
2254                    // SAFETY: See comment in closure passed as `lower_params`
2255                    // parameter above.
2256                    let mut store = token.as_context_mut(store);
2257                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2258                    if let ResultInfo::Heap { results } = &result_info {
2259                        my_src.push(ValRaw::u32(*results));
2260                    }
2261                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2262                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2263                    // on how it was constructed (see
2264                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2265                    // for details) we know it takes `src.len()` parameters and
2266                    // returns up to 1 result.
2267                    unsafe {
2268                        crate::Func::call_unchecked_raw(
2269                            &mut store,
2270                            return_.as_non_null(),
2271                            my_src.as_mut_slice().into(),
2272                        )?;
2273                    }
2274                    let state = store.0.concurrent_state_mut();
2275                    let thread = state.guest_thread.unwrap();
2276                    if sync_caller {
2277                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2278                            if let ResultInfo::Stack { result_count } = &result_info {
2279                                match result_count {
2280                                    0 => None,
2281                                    1 => Some(my_src[0]),
2282                                    _ => unreachable!(),
2283                                }
2284                            } else {
2285                                None
2286                            },
2287                        );
2288                    }
2289                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2290                }),
2291                ty: task_return_type,
2292                memory: NonNull::new(memory).map(SendSyncPtr::new),
2293                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2294            },
2295            Caller::Guest {
2296                thread: old_thread.unwrap(),
2297                instance: caller_instance,
2298            },
2299            None,
2300            self,
2301            callee_instance,
2302            callee_async,
2303        )?;
2304
2305        let guest_task = state.push(new_task)?;
2306        let new_thread = GuestThread::new_implicit(guest_task);
2307        let guest_thread = state.push(new_thread)?;
2308        state.get_mut(guest_task)?.threads.insert(guest_thread);
2309
2310        let state = store.0.concurrent_state_mut();
2311        if let Some(old_thread) = old_thread {
2312            if !state.may_enter(guest_task) {
2313                bail!(crate::Trap::CannotEnterComponent);
2314            }
2315
2316            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2317        };
2318
2319        // Make the new thread the current one so that `Self::start_call` knows
2320        // which one to start.
2321        state.guest_thread = Some(QualifiedThreadId {
2322            task: guest_task,
2323            thread: guest_thread,
2324        });
2325        log::trace!(
2326            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2327        );
2328
2329        Ok(())
2330    }
2331
2332    /// Call the specified callback function for an async-lifted export.
2333    ///
2334    /// SAFETY: `function` must be a valid reference to a guest function of the
2335    /// correct signature for a callback.
2336    unsafe fn call_callback<T>(
2337        self,
2338        mut store: StoreContextMut<T>,
2339        callee_instance: RuntimeComponentInstanceIndex,
2340        function: SendSyncPtr<VMFuncRef>,
2341        event: Event,
2342        handle: u32,
2343        may_enter_after_call: bool,
2344    ) -> Result<u32> {
2345        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2346
2347        let (ordinal, result) = event.parts();
2348        let params = &mut [
2349            ValRaw::u32(ordinal),
2350            ValRaw::u32(handle),
2351            ValRaw::u32(result),
2352        ];
2353        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2354        // `wasmtime-cranelift`-generated fused adapter code or
2355        // `component::Options`.  Per `wasmparser` callback signature
2356        // validation, we know it takes three parameters and returns one.
2357        unsafe {
2358            flags.set_may_enter(false);
2359            crate::Func::call_unchecked_raw(
2360                &mut store,
2361                function.as_non_null(),
2362                params.as_mut_slice().into(),
2363            )?;
2364            flags.set_may_enter(may_enter_after_call);
2365        }
2366        Ok(params[0].get_u32())
2367    }
2368
2369    /// Start a guest->guest call previously prepared using
2370    /// `Self::prepare_call`.
2371    ///
2372    /// This is called from fused adapter code generated in
2373    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2374    /// this function immediately after calling `Self::prepare_call`.
2375    ///
2376    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2377    /// functions with the appropriate signatures for the current guest task.
2378    /// If this is a call to an async-lowered import, the actual call may be
2379    /// deferred and run after this function returns, in which case the pointer
2380    /// arguments must also be valid when the call happens.
2381    unsafe fn start_call<T: 'static>(
2382        self,
2383        mut store: StoreContextMut<T>,
2384        callback: *mut VMFuncRef,
2385        post_return: *mut VMFuncRef,
2386        callee: *mut VMFuncRef,
2387        param_count: u32,
2388        result_count: u32,
2389        flags: u32,
2390        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2391    ) -> Result<u32> {
2392        let token = StoreToken::new(store.as_context_mut());
2393        let async_caller = storage.is_none();
2394        let state = store.0.concurrent_state_mut();
2395        let guest_thread = state.guest_thread.unwrap();
2396        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2397        let may_enter_after_call = state
2398            .get_mut(guest_thread.task)?
2399            .call_post_return_automatically();
2400        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2401        let param_count = usize::try_from(param_count).unwrap();
2402        assert!(param_count <= MAX_FLAT_PARAMS);
2403        let result_count = usize::try_from(result_count).unwrap();
2404        assert!(result_count <= MAX_FLAT_RESULTS);
2405
2406        let task = state.get_mut(guest_thread.task)?;
2407        if !callback.is_null() {
2408            // We're calling an async-lifted export with a callback, so store
2409            // the callback and related context as part of the task so we can
2410            // call it later when needed.
2411            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2412            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2413                let store = token.as_context_mut(store);
2414                unsafe {
2415                    self.call_callback::<T>(
2416                        store,
2417                        runtime_instance,
2418                        callback,
2419                        event,
2420                        handle,
2421                        may_enter_after_call,
2422                    )
2423                }
2424            }));
2425        }
2426
2427        let Caller::Guest {
2428            thread: caller,
2429            instance: runtime_instance,
2430        } = &task.caller
2431        else {
2432            // As of this writing, `start_call` is only used for guest->guest
2433            // calls.
2434            unreachable!()
2435        };
2436        let caller = *caller;
2437        let caller_instance = *runtime_instance;
2438
2439        let callee_instance = task.instance;
2440
2441        let instance_flags = if callback.is_null() {
2442            None
2443        } else {
2444            Some(self.id().get(store.0).instance_flags(callee_instance.index))
2445        };
2446
2447        // Queue the call as a "high priority" work item.
2448        unsafe {
2449            self.queue_call(
2450                store.as_context_mut(),
2451                guest_thread,
2452                callee,
2453                param_count,
2454                result_count,
2455                instance_flags,
2456                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2457                NonNull::new(callback).map(SendSyncPtr::new),
2458                NonNull::new(post_return).map(SendSyncPtr::new),
2459            )?;
2460        }
2461
2462        let state = store.0.concurrent_state_mut();
2463
2464        // Use the caller's `GuestTask::sync_call_set` to register interest in
2465        // the subtask...
2466        let guest_waitable = Waitable::Guest(guest_thread.task);
2467        let old_set = guest_waitable.common(state)?.set;
2468        let set = state.get_mut(caller.task)?.sync_call_set;
2469        guest_waitable.join(state, Some(set))?;
2470
2471        // ... and suspend this fiber temporarily while we wait for it to start.
2472        //
2473        // Note that we _could_ call the callee directly using the current fiber
2474        // rather than suspend this one, but that would make reasoning about the
2475        // event loop more complicated and is probably only worth doing if
2476        // there's a measurable performance benefit.  In addition, it would mean
2477        // blocking the caller if the callee calls a blocking sync-lowered
2478        // import, and as of this writing the spec says we must not do that.
2479        //
2480        // Alternatively, the fused adapter code could be modified to call the
2481        // callee directly without calling a host-provided intrinsic at all (in
2482        // which case it would need to do its own, inline backpressure checks,
2483        // etc.).  Again, we'd want to see a measurable performance benefit
2484        // before committing to such an optimization.  And again, we'd need to
2485        // update the spec to allow that.
2486        let (status, waitable) = loop {
2487            store.0.suspend(SuspendReason::Waiting {
2488                set,
2489                thread: caller,
2490                // Normally, `StoreOpaque::suspend` would assert it's being
2491                // called from a context where blocking is allowed.  However, if
2492                // `async_caller` is `true`, we'll only "block" long enough for
2493                // the callee to start, i.e. we won't repeat this loop, so we
2494                // tell `suspend` it's okay even if we're not allowed to block.
2495                // Alternatively, if the callee is not an async function, then
2496                // we know it won't block anyway.
2497                skip_may_block_check: async_caller || !callee_async,
2498            })?;
2499
2500            let state = store.0.concurrent_state_mut();
2501
2502            log::trace!("taking event for {:?}", guest_thread.task);
2503            let event = guest_waitable.take_event(state)?;
2504            let Some(Event::Subtask { status }) = event else {
2505                unreachable!();
2506            };
2507
2508            log::trace!("status {status:?} for {:?}", guest_thread.task);
2509
2510            if status == Status::Returned {
2511                // It returned, so we can stop waiting.
2512                break (status, None);
2513            } else if async_caller {
2514                // It hasn't returned yet, but the caller is calling via an
2515                // async-lowered import, so we generate a handle for the task
2516                // waitable and return the status.
2517                let handle = store
2518                    .0
2519                    .handle_table(RuntimeInstance {
2520                        instance: self.id().instance(),
2521                        index: caller_instance,
2522                    })
2523                    .subtask_insert_guest(guest_thread.task.rep())?;
2524                store
2525                    .0
2526                    .concurrent_state_mut()
2527                    .get_mut(guest_thread.task)?
2528                    .common
2529                    .handle = Some(handle);
2530                break (status, Some(handle));
2531            } else {
2532                // The callee hasn't returned yet, and the caller is calling via
2533                // a sync-lowered import, so we loop and keep waiting until the
2534                // callee returns.
2535            }
2536        };
2537
2538        let state = store.0.concurrent_state_mut();
2539
2540        guest_waitable.join(state, old_set)?;
2541
2542        if let Some(storage) = storage {
2543            // The caller used a sync-lowered import to call an async-lifted
2544            // export, in which case the result, if any, has been stashed in
2545            // `GuestTask::sync_result`.
2546            let task = state.get_mut(guest_thread.task)?;
2547            if let Some(result) = task.sync_result.take() {
2548                if let Some(result) = result {
2549                    storage[0] = MaybeUninit::new(result);
2550                }
2551
2552                if task.exited {
2553                    if task.ready_to_delete() {
2554                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2555                    }
2556                }
2557            }
2558        }
2559
2560        // Reset the current thread to point to the caller as it resumes control.
2561        state.guest_thread = Some(caller);
2562        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2563        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2564
2565        Ok(status.pack(waitable))
2566    }
2567
2568    /// Poll the specified future once on behalf of a guest->host call using an
2569    /// async-lowered import.
2570    ///
2571    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2572    /// `Pending`, add it to the set of futures to be polled as part of this
2573    /// instance's event loop until it completes, and then return
2574    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2575    ///
2576    /// Whether the future returns `Ready` immediately or later, the `lower`
2577    /// function will be used to lower the result, if any, into the guest caller's
2578    /// stack and linear memory unless the task has been cancelled.
2579    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2580        self,
2581        mut store: StoreContextMut<'_, T>,
2582        future: impl Future<Output = Result<R>> + Send + 'static,
2583        caller_instance: RuntimeComponentInstanceIndex,
2584        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2585    ) -> Result<Option<u32>> {
2586        let token = StoreToken::new(store.as_context_mut());
2587        let state = store.0.concurrent_state_mut();
2588        let caller = state.guest_thread.unwrap();
2589
2590        // Create an abortable future which hooks calls to poll and manages call
2591        // context state for the future.
2592        let (join_handle, future) = JoinHandle::run(async move {
2593            let mut future = pin!(future);
2594            let mut call_context = None;
2595            future::poll_fn(move |cx| {
2596                // Push the call context for managing any resource borrows
2597                // for the task.
2598                tls::get(|store| {
2599                    if let Some(call_context) = call_context.take() {
2600                        token
2601                            .as_context_mut(store)
2602                            .0
2603                            .component_resource_state()
2604                            .0
2605                            .push(call_context);
2606                    }
2607                });
2608
2609                let result = future.as_mut().poll(cx);
2610
2611                if result.is_pending() {
2612                    // Pop the call context for managing any resource
2613                    // borrows for the task.
2614                    tls::get(|store| {
2615                        call_context = Some(
2616                            token
2617                                .as_context_mut(store)
2618                                .0
2619                                .component_resource_state()
2620                                .0
2621                                .pop()
2622                                .unwrap(),
2623                        );
2624                    });
2625                }
2626                result
2627            })
2628            .await
2629        });
2630
2631        // We create a new host task even though it might complete immediately
2632        // (in which case we won't need to pass a waitable back to the guest).
2633        // If it does complete immediately, we'll remove it before we return.
2634        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2635
2636        log::trace!("new host task child of {caller:?}: {task:?}");
2637
2638        let mut future = Box::pin(future);
2639
2640        // Finally, poll the future.  We can use a dummy `Waker` here because
2641        // we'll add the future to `ConcurrentState::futures` and poll it
2642        // automatically from the event loop if it doesn't complete immediately
2643        // here.
2644        let poll = tls::set(store.0, || {
2645            future
2646                .as_mut()
2647                .poll(&mut Context::from_waker(&Waker::noop()))
2648        });
2649
2650        Ok(match poll {
2651            Poll::Ready(None) => unreachable!(),
2652            Poll::Ready(Some(result)) => {
2653                // It finished immediately; lower the result and delete the
2654                // task.
2655                lower(store.as_context_mut(), result?)?;
2656                log::trace!("delete host task {task:?} (already ready)");
2657                store.0.concurrent_state_mut().delete(task)?;
2658                None
2659            }
2660            Poll::Pending => {
2661                // It hasn't finished yet; add the future to
2662                // `ConcurrentState::futures` so it will be polled by the event
2663                // loop and allocate a waitable handle to return to the guest.
2664
2665                // Wrap the future in a closure responsible for lowering the result into
2666                // the guest's stack and memory, as well as notifying any waiters that
2667                // the task returned.
2668                let future =
2669                    Box::pin(async move {
2670                        let result = match future.await {
2671                            Some(result) => result?,
2672                            // Task was cancelled; nothing left to do.
2673                            None => return Ok(()),
2674                        };
2675                        tls::get(move |store| {
2676                            // Here we schedule a task to run on a worker fiber to do
2677                            // the lowering since it may involve a call to the guest's
2678                            // realloc function.  This is necessary because calling the
2679                            // guest while there are host embedder frames on the stack
2680                            // is unsound.
2681                            store.concurrent_state_mut().push_high_priority(
2682                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2683                                    lower(token.as_context_mut(store), result)?;
2684                                    let state = store.concurrent_state_mut();
2685                                    state.get_mut(task)?.join_handle.take();
2686                                    Waitable::Host(task).set_event(
2687                                        state,
2688                                        Some(Event::Subtask {
2689                                            status: Status::Returned,
2690                                        }),
2691                                    )
2692                                }))),
2693                            );
2694                            Ok(())
2695                        })
2696                    });
2697
2698                store.0.concurrent_state_mut().push_future(future);
2699                let handle = store
2700                    .0
2701                    .handle_table(RuntimeInstance {
2702                        instance: self.id().instance(),
2703                        index: caller_instance,
2704                    })
2705                    .subtask_insert_host(task.rep())?;
2706                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2707                log::trace!(
2708                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2709                );
2710                Some(handle)
2711            }
2712        })
2713    }
2714
2715    /// Implements the `task.return` intrinsic, lifting the result for the
2716    /// current guest task.
2717    pub(crate) fn task_return(
2718        self,
2719        store: &mut dyn VMStore,
2720        caller: RuntimeComponentInstanceIndex,
2721        ty: TypeTupleIndex,
2722        options: OptionsIndex,
2723        storage: &[ValRaw],
2724    ) -> Result<()> {
2725        self.id().get(store).check_may_leave(caller)?;
2726        let state = store.concurrent_state_mut();
2727        let guest_thread = state.guest_thread.unwrap();
2728        let lift = state
2729            .get_mut(guest_thread.task)?
2730            .lift_result
2731            .take()
2732            .ok_or_else(|| {
2733                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2734            })?;
2735        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2736
2737        let CanonicalOptions {
2738            string_encoding,
2739            data_model,
2740            ..
2741        } = &self.id().get(store).component().env_component().options[options];
2742
2743        let invalid = ty != lift.ty
2744            || string_encoding != &lift.string_encoding
2745            || match data_model {
2746                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2747                    Some(memory) => {
2748                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2749                        let actual = self.id().get(store).runtime_memory(memory);
2750                        expected != actual.as_ptr()
2751                    }
2752                    // Memory not specified, meaning it didn't need to be
2753                    // specified per validation, so not invalid.
2754                    None => false,
2755                },
2756                // Always invalid as this isn't supported.
2757                CanonicalOptionsDataModel::Gc { .. } => true,
2758            };
2759
2760        if invalid {
2761            bail!("invalid `task.return` signature and/or options for current task");
2762        }
2763
2764        log::trace!("task.return for {guest_thread:?}");
2765
2766        let result = (lift.lift)(store, storage)?;
2767        self.task_complete(
2768            store,
2769            guest_thread.task,
2770            result,
2771            Status::Returned,
2772            ValRaw::i32(0),
2773        )
2774    }
2775
2776    /// Implements the `task.cancel` intrinsic.
2777    pub(crate) fn task_cancel(
2778        self,
2779        store: &mut StoreOpaque,
2780        caller: RuntimeComponentInstanceIndex,
2781    ) -> Result<()> {
2782        self.id().get(store).check_may_leave(caller)?;
2783        let state = store.concurrent_state_mut();
2784        let guest_thread = state.guest_thread.unwrap();
2785        let task = state.get_mut(guest_thread.task)?;
2786        if !task.cancel_sent {
2787            bail!("`task.cancel` called by task which has not been cancelled")
2788        }
2789        _ = task.lift_result.take().ok_or_else(|| {
2790            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2791        })?;
2792
2793        assert!(task.result.is_none());
2794
2795        log::trace!("task.cancel for {guest_thread:?}");
2796
2797        self.task_complete(
2798            store,
2799            guest_thread.task,
2800            Box::new(DummyResult),
2801            Status::ReturnCancelled,
2802            ValRaw::i32(0),
2803        )
2804    }
2805
2806    /// Complete the specified guest task (i.e. indicate that it has either
2807    /// returned a (possibly empty) result or cancelled itself).
2808    ///
2809    /// This will return any resource borrows and notify any current or future
2810    /// waiters that the task has completed.
2811    fn task_complete(
2812        self,
2813        store: &mut StoreOpaque,
2814        guest_task: TableId<GuestTask>,
2815        result: Box<dyn Any + Send + Sync>,
2816        status: Status,
2817        post_return_arg: ValRaw,
2818    ) -> Result<()> {
2819        if store
2820            .concurrent_state_mut()
2821            .get_mut(guest_task)?
2822            .call_post_return_automatically()
2823        {
2824            let (calls, host_table, _, instance) =
2825                store.component_resource_state_with_instance(self);
2826            ResourceTables {
2827                calls,
2828                host_table: Some(host_table),
2829                guest: Some(instance.instance_states()),
2830            }
2831            .exit_call()?;
2832        } else {
2833            // As of this writing, the only scenario where `call_post_return_automatically`
2834            // would be false for a `GuestTask` is for host-to-guest calls using
2835            // `[Typed]Func::call_async`, in which case the `function_index`
2836            // should be a non-`None` value.
2837            let function_index = store
2838                .concurrent_state_mut()
2839                .get_mut(guest_task)?
2840                .function_index
2841                .unwrap();
2842            self.id()
2843                .get_mut(store)
2844                .post_return_arg_set(function_index, post_return_arg);
2845        }
2846
2847        let state = store.concurrent_state_mut();
2848        let task = state.get_mut(guest_task)?;
2849
2850        if let Caller::Host { tx, .. } = &mut task.caller {
2851            if let Some(tx) = tx.take() {
2852                _ = tx.send(result);
2853            }
2854        } else {
2855            task.result = Some(result);
2856            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2857        }
2858
2859        Ok(())
2860    }
2861
2862    /// Implements the `waitable-set.new` intrinsic.
2863    pub(crate) fn waitable_set_new(
2864        self,
2865        store: &mut StoreOpaque,
2866        caller_instance: RuntimeComponentInstanceIndex,
2867    ) -> Result<u32> {
2868        self.id().get_mut(store).check_may_leave(caller_instance)?;
2869        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2870        let handle = store
2871            .handle_table(RuntimeInstance {
2872                instance: self.id().instance(),
2873                index: caller_instance,
2874            })
2875            .waitable_set_insert(set.rep())?;
2876        log::trace!("new waitable set {set:?} (handle {handle})");
2877        Ok(handle)
2878    }
2879
2880    /// Implements the `waitable-set.drop` intrinsic.
2881    pub(crate) fn waitable_set_drop(
2882        self,
2883        store: &mut StoreOpaque,
2884        caller_instance: RuntimeComponentInstanceIndex,
2885        set: u32,
2886    ) -> Result<()> {
2887        self.id().get_mut(store).check_may_leave(caller_instance)?;
2888        let rep = store
2889            .handle_table(RuntimeInstance {
2890                instance: self.id().instance(),
2891                index: caller_instance,
2892            })
2893            .waitable_set_remove(set)?;
2894
2895        log::trace!("drop waitable set {rep} (handle {set})");
2896
2897        let set = store
2898            .concurrent_state_mut()
2899            .delete(TableId::<WaitableSet>::new(rep))?;
2900
2901        if !set.waiting.is_empty() {
2902            bail!("cannot drop waitable set with waiters");
2903        }
2904
2905        Ok(())
2906    }
2907
2908    /// Implements the `waitable.join` intrinsic.
2909    pub(crate) fn waitable_join(
2910        self,
2911        store: &mut StoreOpaque,
2912        caller_instance: RuntimeComponentInstanceIndex,
2913        waitable_handle: u32,
2914        set_handle: u32,
2915    ) -> Result<()> {
2916        let mut instance = self.id().get_mut(store);
2917        instance.check_may_leave(caller_instance)?;
2918        let waitable =
2919            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2920
2921        let set = if set_handle == 0 {
2922            None
2923        } else {
2924            let set = instance.instance_states().0[caller_instance]
2925                .handle_table()
2926                .waitable_set_rep(set_handle)?;
2927
2928            Some(TableId::<WaitableSet>::new(set))
2929        };
2930
2931        log::trace!(
2932            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2933        );
2934
2935        waitable.join(store.concurrent_state_mut(), set)
2936    }
2937
2938    /// Implements the `subtask.drop` intrinsic.
2939    pub(crate) fn subtask_drop(
2940        self,
2941        store: &mut StoreOpaque,
2942        caller_instance: RuntimeComponentInstanceIndex,
2943        task_id: u32,
2944    ) -> Result<()> {
2945        self.id().get_mut(store).check_may_leave(caller_instance)?;
2946        self.waitable_join(store, caller_instance, task_id, 0)?;
2947
2948        let (rep, is_host) = store
2949            .handle_table(RuntimeInstance {
2950                instance: self.id().instance(),
2951                index: caller_instance,
2952            })
2953            .subtask_remove(task_id)?;
2954
2955        let concurrent_state = store.concurrent_state_mut();
2956        let (waitable, expected_caller_instance, delete) = if is_host {
2957            let id = TableId::<HostTask>::new(rep);
2958            let task = concurrent_state.get_mut(id)?;
2959            if task.join_handle.is_some() {
2960                bail!("cannot drop a subtask which has not yet resolved");
2961            }
2962            (Waitable::Host(id), task.caller_instance, true)
2963        } else {
2964            let id = TableId::<GuestTask>::new(rep);
2965            let task = concurrent_state.get_mut(id)?;
2966            if task.lift_result.is_some() {
2967                bail!("cannot drop a subtask which has not yet resolved");
2968            }
2969            if let Caller::Guest { instance, .. } = &task.caller {
2970                (Waitable::Guest(id), *instance, task.exited)
2971            } else {
2972                unreachable!()
2973            }
2974        };
2975
2976        waitable.common(concurrent_state)?.handle = None;
2977
2978        if waitable.take_event(concurrent_state)?.is_some() {
2979            bail!("cannot drop a subtask with an undelivered event");
2980        }
2981
2982        if delete {
2983            waitable.delete_from(concurrent_state)?;
2984        }
2985
2986        // Since waitables can neither be passed between instances nor forged,
2987        // this should never fail unless there's a bug in Wasmtime, but we check
2988        // here to be sure:
2989        assert_eq!(expected_caller_instance, caller_instance);
2990        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2991        Ok(())
2992    }
2993
2994    /// Implements the `waitable-set.wait` intrinsic.
2995    pub(crate) fn waitable_set_wait(
2996        self,
2997        store: &mut StoreOpaque,
2998        caller: RuntimeComponentInstanceIndex,
2999        options: OptionsIndex,
3000        set: u32,
3001        payload: u32,
3002    ) -> Result<u32> {
3003        self.id().get(store).check_may_leave(caller)?;
3004
3005        if !self.options(store, options).async_ {
3006            // The caller may only call `waitable-set.wait` from an async task
3007            // (i.e. a task created via a call to an async export).
3008            // Otherwise, we'll trap.
3009            store.concurrent_state_mut().check_blocking()?;
3010        }
3011
3012        let &CanonicalOptions {
3013            cancellable,
3014            instance: caller_instance,
3015            ..
3016        } = &self.id().get(store).component().env_component().options[options];
3017        let rep = store
3018            .handle_table(RuntimeInstance {
3019                instance: self.id().instance(),
3020                index: caller_instance,
3021            })
3022            .waitable_set_rep(set)?;
3023
3024        self.waitable_check(
3025            store,
3026            cancellable,
3027            WaitableCheck::Wait,
3028            WaitableCheckParams {
3029                set: TableId::new(rep),
3030                options,
3031                payload,
3032            },
3033        )
3034    }
3035
3036    /// Implements the `waitable-set.poll` intrinsic.
3037    pub(crate) fn waitable_set_poll(
3038        self,
3039        store: &mut StoreOpaque,
3040        caller: RuntimeComponentInstanceIndex,
3041        options: OptionsIndex,
3042        set: u32,
3043        payload: u32,
3044    ) -> Result<u32> {
3045        self.id().get(store).check_may_leave(caller)?;
3046
3047        let &CanonicalOptions {
3048            cancellable,
3049            instance: caller_instance,
3050            ..
3051        } = &self.id().get(store).component().env_component().options[options];
3052        let rep = store
3053            .handle_table(RuntimeInstance {
3054                instance: self.id().instance(),
3055                index: caller_instance,
3056            })
3057            .waitable_set_rep(set)?;
3058
3059        self.waitable_check(
3060            store,
3061            cancellable,
3062            WaitableCheck::Poll,
3063            WaitableCheckParams {
3064                set: TableId::new(rep),
3065                options,
3066                payload,
3067            },
3068        )
3069    }
3070
3071    /// Implements the `thread.index` intrinsic.
3072    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3073        let thread_id = store
3074            .concurrent_state_mut()
3075            .guest_thread
3076            .ok_or_else(|| anyhow!("no current thread"))?
3077            .thread;
3078        // The unwrap is safe because `instance_rep` must be `Some` by this point
3079        Ok(store
3080            .concurrent_state_mut()
3081            .get_mut(thread_id)?
3082            .instance_rep
3083            .unwrap())
3084    }
3085
3086    /// Implements the `thread.new-indirect` intrinsic.
3087    pub(crate) fn thread_new_indirect<T: 'static>(
3088        self,
3089        mut store: StoreContextMut<T>,
3090        runtime_instance: RuntimeComponentInstanceIndex,
3091        _func_ty_idx: TypeFuncIndex, // currently unused
3092        start_func_table_idx: RuntimeTableIndex,
3093        start_func_idx: u32,
3094        context: i32,
3095    ) -> Result<u32> {
3096        self.id().get(store.0).check_may_leave(runtime_instance)?;
3097
3098        log::trace!("creating new thread");
3099
3100        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3101        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3102        let callee = instance
3103            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3104            .ok_or_else(|| {
3105                anyhow!("the start function index points to an uninitialized function")
3106            })?;
3107        if callee.type_index(store.0) != start_func_ty.type_index() {
3108            bail!(
3109                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3110            );
3111        }
3112
3113        let token = StoreToken::new(store.as_context_mut());
3114        let start_func = Box::new(
3115            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3116                let old_thread = store
3117                    .concurrent_state_mut()
3118                    .guest_thread
3119                    .replace(guest_thread);
3120                log::trace!(
3121                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3122                );
3123
3124                store.maybe_push_call_context(guest_thread.task)?;
3125
3126                let mut store = token.as_context_mut(store);
3127                let mut params = [ValRaw::i32(context)];
3128                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3129                // on a separate fiber if we're running in an async store.
3130                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3131
3132                store.0.maybe_pop_call_context(guest_thread.task)?;
3133
3134                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3135                log::trace!("explicit thread {guest_thread:?} completed");
3136                let state = store.0.concurrent_state_mut();
3137                let task = state.get_mut(guest_thread.task)?;
3138                if task.threads.is_empty() && !task.returned_or_cancelled() {
3139                    bail!(Trap::NoAsyncResult);
3140                }
3141                state.guest_thread = old_thread;
3142                old_thread
3143                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3144                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3145                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3146                }
3147                log::trace!("thread start: restored {old_thread:?} as current thread");
3148
3149                Ok(())
3150            },
3151        );
3152
3153        let state = store.0.concurrent_state_mut();
3154        let current_thread = state.guest_thread.unwrap();
3155        let parent_task = current_thread.task;
3156
3157        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3158        let thread_id = state.push(new_thread)?;
3159        state.get_mut(parent_task)?.threads.insert(thread_id);
3160
3161        log::trace!("new thread with id {thread_id:?} created");
3162
3163        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3164    }
3165
3166    pub(crate) fn resume_suspended_thread(
3167        self,
3168        store: &mut StoreOpaque,
3169        runtime_instance: RuntimeComponentInstanceIndex,
3170        thread_idx: u32,
3171        high_priority: bool,
3172    ) -> Result<()> {
3173        let thread_id =
3174            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3175        let state = store.concurrent_state_mut();
3176        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3177        let thread = state.get_mut(guest_thread.thread)?;
3178
3179        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3180            GuestThreadState::NotStartedExplicit(start_func) => {
3181                log::trace!("starting thread {guest_thread:?}");
3182                let guest_call = WorkItem::GuestCall(GuestCall {
3183                    thread: guest_thread,
3184                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3185                        start_func(store, guest_thread)
3186                    })),
3187                });
3188                store
3189                    .concurrent_state_mut()
3190                    .push_work_item(guest_call, high_priority);
3191            }
3192            GuestThreadState::Suspended(fiber) => {
3193                log::trace!("resuming thread {thread_id:?} that was suspended");
3194                store
3195                    .concurrent_state_mut()
3196                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3197            }
3198            _ => {
3199                bail!("cannot resume thread which is not suspended");
3200            }
3201        }
3202        Ok(())
3203    }
3204
3205    fn add_guest_thread_to_instance_table(
3206        self,
3207        thread_id: TableId<GuestThread>,
3208        store: &mut StoreOpaque,
3209        runtime_instance: RuntimeComponentInstanceIndex,
3210    ) -> Result<u32> {
3211        let guest_id = store
3212            .handle_table(RuntimeInstance {
3213                instance: self.id().instance(),
3214                index: runtime_instance,
3215            })
3216            .guest_thread_insert(thread_id.rep())?;
3217        store
3218            .concurrent_state_mut()
3219            .get_mut(thread_id)?
3220            .instance_rep = Some(guest_id);
3221        Ok(guest_id)
3222    }
3223
3224    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3225    /// and `thread.switch-to` intrinsics.
3226    pub(crate) fn suspension_intrinsic(
3227        self,
3228        store: &mut StoreOpaque,
3229        caller: RuntimeComponentInstanceIndex,
3230        cancellable: bool,
3231        yielding: bool,
3232        to_thread: Option<u32>,
3233    ) -> Result<WaitResult> {
3234        self.id().get(store).check_may_leave(caller)?;
3235
3236        if to_thread.is_none() {
3237            let state = store.concurrent_state_mut();
3238            if yielding {
3239                // This is a `thread.yield` call
3240                if !state.may_block(state.guest_thread.unwrap().task) {
3241                    // The spec defines `thread.yield` to be a no-op in a
3242                    // non-blocking context, so we return immediately without giving
3243                    // any other thread a chance to run.
3244                    return Ok(WaitResult::Completed);
3245                }
3246            } else {
3247                // The caller may only call `thread.suspend` from an async task
3248                // (i.e. a task created via a call to an async export).
3249                // Otherwise, we'll trap.
3250                state.check_blocking()?;
3251            }
3252        }
3253
3254        // There could be a pending cancellation from a previous uncancellable wait
3255        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3256            return Ok(WaitResult::Cancelled);
3257        }
3258
3259        if let Some(thread) = to_thread {
3260            self.resume_suspended_thread(store, caller, thread, true)?;
3261        }
3262
3263        let state = store.concurrent_state_mut();
3264        let guest_thread = state.guest_thread.unwrap();
3265        let reason = if yielding {
3266            SuspendReason::Yielding {
3267                thread: guest_thread,
3268            }
3269        } else {
3270            SuspendReason::ExplicitlySuspending {
3271                thread: guest_thread,
3272                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3273                // we're handling a `thread.switch-to` call; otherwise it would
3274                // panic if we called it in a non-blocking context.
3275                skip_may_block_check: to_thread.is_some(),
3276            }
3277        };
3278
3279        store.suspend(reason)?;
3280
3281        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3282            Ok(WaitResult::Cancelled)
3283        } else {
3284            Ok(WaitResult::Completed)
3285        }
3286    }
3287
3288    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3289    fn waitable_check(
3290        self,
3291        store: &mut StoreOpaque,
3292        cancellable: bool,
3293        check: WaitableCheck,
3294        params: WaitableCheckParams,
3295    ) -> Result<u32> {
3296        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3297
3298        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3299
3300        let state = store.concurrent_state_mut();
3301        let task = state.get_mut(guest_thread.task)?;
3302
3303        // If we're waiting, and there are no events immediately available,
3304        // suspend the fiber until that changes.
3305        match &check {
3306            WaitableCheck::Wait => {
3307                let set = params.set;
3308
3309                if (task.event.is_none()
3310                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3311                    && state.get_mut(set)?.ready.is_empty()
3312                {
3313                    if cancellable {
3314                        let old = state
3315                            .get_mut(guest_thread.thread)?
3316                            .wake_on_cancel
3317                            .replace(set);
3318                        assert!(old.is_none());
3319                    }
3320
3321                    store.suspend(SuspendReason::Waiting {
3322                        set,
3323                        thread: guest_thread,
3324                        skip_may_block_check: false,
3325                    })?;
3326                }
3327            }
3328            WaitableCheck::Poll => {}
3329        }
3330
3331        log::trace!(
3332            "waitable check for {guest_thread:?}; set {:?}, part two",
3333            params.set
3334        );
3335
3336        // Deliver any pending events to the guest and return.
3337        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3338
3339        let (ordinal, handle, result) = match &check {
3340            WaitableCheck::Wait => {
3341                let (event, waitable) = event.unwrap();
3342                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3343                let (ordinal, result) = event.parts();
3344                (ordinal, handle, result)
3345            }
3346            WaitableCheck::Poll => {
3347                if let Some((event, waitable)) = event {
3348                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3349                    let (ordinal, result) = event.parts();
3350                    (ordinal, handle, result)
3351                } else {
3352                    log::trace!(
3353                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3354                        guest_thread.task,
3355                        params.set
3356                    );
3357                    let (ordinal, result) = Event::None.parts();
3358                    (ordinal, 0, result)
3359                }
3360            }
3361        };
3362        let memory = self.options_memory_mut(store, params.options);
3363        let ptr = func::validate_inbounds_dynamic(
3364            &CanonicalAbiInfo::POINTER_PAIR,
3365            memory,
3366            &ValRaw::u32(params.payload),
3367        )?;
3368        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3369        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3370        Ok(ordinal)
3371    }
3372
3373    /// Implements the `subtask.cancel` intrinsic.
3374    pub(crate) fn subtask_cancel(
3375        self,
3376        store: &mut StoreOpaque,
3377        caller_instance: RuntimeComponentInstanceIndex,
3378        async_: bool,
3379        task_id: u32,
3380    ) -> Result<u32> {
3381        self.id().get(store).check_may_leave(caller_instance)?;
3382
3383        if !async_ {
3384            // The caller may only sync call `subtask.cancel` from an async task
3385            // (i.e. a task created via a call to an async export).  Otherwise,
3386            // we'll trap.
3387            store.concurrent_state_mut().check_blocking()?;
3388        }
3389
3390        let (rep, is_host) = store
3391            .handle_table(RuntimeInstance {
3392                instance: self.id().instance(),
3393                index: caller_instance,
3394            })
3395            .subtask_rep(task_id)?;
3396        let (waitable, expected_caller_instance) = if is_host {
3397            let id = TableId::<HostTask>::new(rep);
3398            (
3399                Waitable::Host(id),
3400                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3401            )
3402        } else {
3403            let id = TableId::<GuestTask>::new(rep);
3404            if let Caller::Guest { instance, .. } =
3405                &store.concurrent_state_mut().get_mut(id)?.caller
3406            {
3407                (Waitable::Guest(id), *instance)
3408            } else {
3409                unreachable!()
3410            }
3411        };
3412        // Since waitables can neither be passed between instances nor forged,
3413        // this should never fail unless there's a bug in Wasmtime, but we check
3414        // here to be sure:
3415        assert_eq!(expected_caller_instance, caller_instance);
3416
3417        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3418
3419        let concurrent_state = store.concurrent_state_mut();
3420        if let Waitable::Host(host_task) = waitable {
3421            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3422                handle.abort();
3423                return Ok(Status::ReturnCancelled as u32);
3424            }
3425        } else {
3426            let caller = concurrent_state.guest_thread.unwrap();
3427            let guest_task = TableId::<GuestTask>::new(rep);
3428            let task = concurrent_state.get_mut(guest_task)?;
3429            if !task.already_lowered_parameters() {
3430                task.lower_params = None;
3431                task.lift_result = None;
3432
3433                // Not yet started; cancel and remove from pending
3434                let instance = task.instance;
3435                let pending = &mut store.instance_state(instance).pending;
3436                let pending_count = pending.len();
3437                pending.retain(|thread, _| thread.task != guest_task);
3438                // If there were no pending threads for this task, we're in an error state
3439                if pending.len() == pending_count {
3440                    bail!("`subtask.cancel` called after terminal status delivered");
3441                }
3442                return Ok(Status::StartCancelled as u32);
3443            } else if !task.returned_or_cancelled() {
3444                // Started, but not yet returned or cancelled; send the
3445                // `CANCELLED` event
3446                task.cancel_sent = true;
3447                // Note that this might overwrite an event that was set earlier
3448                // (e.g. `Event::None` if the task is yielding, or
3449                // `Event::Cancelled` if it was already cancelled), but that's
3450                // okay -- this should supersede the previous state.
3451                task.event = Some(Event::Cancelled);
3452                for thread in task.threads.clone() {
3453                    let thread = QualifiedThreadId {
3454                        task: guest_task,
3455                        thread,
3456                    };
3457                    if let Some(set) = concurrent_state
3458                        .get_mut(thread.thread)
3459                        .unwrap()
3460                        .wake_on_cancel
3461                        .take()
3462                    {
3463                        let item = match concurrent_state
3464                            .get_mut(set)?
3465                            .waiting
3466                            .remove(&thread)
3467                            .unwrap()
3468                        {
3469                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3470                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3471                                thread,
3472                                kind: GuestCallKind::DeliverEvent {
3473                                    instance,
3474                                    set: None,
3475                                },
3476                            }),
3477                        };
3478                        concurrent_state.push_high_priority(item);
3479
3480                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3481                        break;
3482                    }
3483                }
3484
3485                let concurrent_state = store.concurrent_state_mut();
3486                let task = concurrent_state.get_mut(guest_task)?;
3487                if !task.returned_or_cancelled() {
3488                    if async_ {
3489                        return Ok(BLOCKED);
3490                    } else {
3491                        store.wait_for_event(Waitable::Guest(guest_task))?;
3492                    }
3493                }
3494            }
3495        }
3496
3497        let event = waitable.take_event(store.concurrent_state_mut())?;
3498        if let Some(Event::Subtask {
3499            status: status @ (Status::Returned | Status::ReturnCancelled),
3500        }) = event
3501        {
3502            Ok(status as u32)
3503        } else {
3504            bail!("`subtask.cancel` called after terminal status delivered");
3505        }
3506    }
3507
3508    pub(crate) fn context_get(
3509        self,
3510        store: &mut StoreOpaque,
3511        caller: RuntimeComponentInstanceIndex,
3512        slot: u32,
3513    ) -> Result<u32> {
3514        self.id().get(store).check_may_leave(caller)?;
3515        store.concurrent_state_mut().context_get(slot)
3516    }
3517
3518    pub(crate) fn context_set(
3519        self,
3520        store: &mut StoreOpaque,
3521        caller: RuntimeComponentInstanceIndex,
3522        slot: u32,
3523        value: u32,
3524    ) -> Result<()> {
3525        self.id().get(store).check_may_leave(caller)?;
3526        store.concurrent_state_mut().context_set(slot, value)
3527    }
3528}
3529
3530/// Trait representing component model ABI async intrinsics and fused adapter
3531/// helper functions.
3532///
3533/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3534/// which must be valid for at least the duration of the call (and possibly for
3535/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3536/// pointers used for async calls).
3537pub trait VMComponentAsyncStore {
3538    /// A helper function for fused adapter modules involving calls where the
3539    /// one of the caller or callee is async.
3540    ///
3541    /// This helper is not used when the caller and callee both use the sync
3542    /// ABI, only when at least one is async is this used.
3543    unsafe fn prepare_call(
3544        &mut self,
3545        instance: Instance,
3546        memory: *mut VMMemoryDefinition,
3547        start: *mut VMFuncRef,
3548        return_: *mut VMFuncRef,
3549        caller_instance: RuntimeComponentInstanceIndex,
3550        callee_instance: RuntimeComponentInstanceIndex,
3551        task_return_type: TypeTupleIndex,
3552        callee_async: bool,
3553        string_encoding: u8,
3554        result_count: u32,
3555        storage: *mut ValRaw,
3556        storage_len: usize,
3557    ) -> Result<()>;
3558
3559    /// A helper function for fused adapter modules involving calls where the
3560    /// caller is sync-lowered but the callee is async-lifted.
3561    unsafe fn sync_start(
3562        &mut self,
3563        instance: Instance,
3564        callback: *mut VMFuncRef,
3565        callee: *mut VMFuncRef,
3566        param_count: u32,
3567        storage: *mut MaybeUninit<ValRaw>,
3568        storage_len: usize,
3569    ) -> Result<()>;
3570
3571    /// A helper function for fused adapter modules involving calls where the
3572    /// caller is async-lowered.
3573    unsafe fn async_start(
3574        &mut self,
3575        instance: Instance,
3576        callback: *mut VMFuncRef,
3577        post_return: *mut VMFuncRef,
3578        callee: *mut VMFuncRef,
3579        param_count: u32,
3580        result_count: u32,
3581        flags: u32,
3582    ) -> Result<u32>;
3583
3584    /// The `future.write` intrinsic.
3585    fn future_write(
3586        &mut self,
3587        instance: Instance,
3588        caller: RuntimeComponentInstanceIndex,
3589        ty: TypeFutureTableIndex,
3590        options: OptionsIndex,
3591        future: u32,
3592        address: u32,
3593    ) -> Result<u32>;
3594
3595    /// The `future.read` intrinsic.
3596    fn future_read(
3597        &mut self,
3598        instance: Instance,
3599        caller: RuntimeComponentInstanceIndex,
3600        ty: TypeFutureTableIndex,
3601        options: OptionsIndex,
3602        future: u32,
3603        address: u32,
3604    ) -> Result<u32>;
3605
3606    /// The `future.drop-writable` intrinsic.
3607    fn future_drop_writable(
3608        &mut self,
3609        instance: Instance,
3610        caller: RuntimeComponentInstanceIndex,
3611        ty: TypeFutureTableIndex,
3612        writer: u32,
3613    ) -> Result<()>;
3614
3615    /// The `stream.write` intrinsic.
3616    fn stream_write(
3617        &mut self,
3618        instance: Instance,
3619        caller: RuntimeComponentInstanceIndex,
3620        ty: TypeStreamTableIndex,
3621        options: OptionsIndex,
3622        stream: u32,
3623        address: u32,
3624        count: u32,
3625    ) -> Result<u32>;
3626
3627    /// The `stream.read` intrinsic.
3628    fn stream_read(
3629        &mut self,
3630        instance: Instance,
3631        caller: RuntimeComponentInstanceIndex,
3632        ty: TypeStreamTableIndex,
3633        options: OptionsIndex,
3634        stream: u32,
3635        address: u32,
3636        count: u32,
3637    ) -> Result<u32>;
3638
3639    /// The "fast-path" implementation of the `stream.write` intrinsic for
3640    /// "flat" (i.e. memcpy-able) payloads.
3641    fn flat_stream_write(
3642        &mut self,
3643        instance: Instance,
3644        caller: RuntimeComponentInstanceIndex,
3645        ty: TypeStreamTableIndex,
3646        options: OptionsIndex,
3647        payload_size: u32,
3648        payload_align: u32,
3649        stream: u32,
3650        address: u32,
3651        count: u32,
3652    ) -> Result<u32>;
3653
3654    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3655    /// (i.e. memcpy-able) payloads.
3656    fn flat_stream_read(
3657        &mut self,
3658        instance: Instance,
3659        caller: RuntimeComponentInstanceIndex,
3660        ty: TypeStreamTableIndex,
3661        options: OptionsIndex,
3662        payload_size: u32,
3663        payload_align: u32,
3664        stream: u32,
3665        address: u32,
3666        count: u32,
3667    ) -> Result<u32>;
3668
3669    /// The `stream.drop-writable` intrinsic.
3670    fn stream_drop_writable(
3671        &mut self,
3672        instance: Instance,
3673        caller: RuntimeComponentInstanceIndex,
3674        ty: TypeStreamTableIndex,
3675        writer: u32,
3676    ) -> Result<()>;
3677
3678    /// The `error-context.debug-message` intrinsic.
3679    fn error_context_debug_message(
3680        &mut self,
3681        instance: Instance,
3682        caller: RuntimeComponentInstanceIndex,
3683        ty: TypeComponentLocalErrorContextTableIndex,
3684        options: OptionsIndex,
3685        err_ctx_handle: u32,
3686        debug_msg_address: u32,
3687    ) -> Result<()>;
3688
3689    /// The `thread.new-indirect` intrinsic
3690    fn thread_new_indirect(
3691        &mut self,
3692        instance: Instance,
3693        caller: RuntimeComponentInstanceIndex,
3694        func_ty_idx: TypeFuncIndex,
3695        start_func_table_idx: RuntimeTableIndex,
3696        start_func_idx: u32,
3697        context: i32,
3698    ) -> Result<u32>;
3699}
3700
3701/// SAFETY: See trait docs.
3702impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3703    unsafe fn prepare_call(
3704        &mut self,
3705        instance: Instance,
3706        memory: *mut VMMemoryDefinition,
3707        start: *mut VMFuncRef,
3708        return_: *mut VMFuncRef,
3709        caller_instance: RuntimeComponentInstanceIndex,
3710        callee_instance: RuntimeComponentInstanceIndex,
3711        task_return_type: TypeTupleIndex,
3712        callee_async: bool,
3713        string_encoding: u8,
3714        result_count_or_max_if_async: u32,
3715        storage: *mut ValRaw,
3716        storage_len: usize,
3717    ) -> Result<()> {
3718        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3719        // this method will have ensured that `storage` is a valid
3720        // pointer containing at least `storage_len` items.
3721        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3722
3723        unsafe {
3724            instance.prepare_call(
3725                StoreContextMut(self),
3726                start,
3727                return_,
3728                caller_instance,
3729                callee_instance,
3730                task_return_type,
3731                callee_async,
3732                memory,
3733                string_encoding,
3734                match result_count_or_max_if_async {
3735                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3736                        params,
3737                        has_result: false,
3738                    },
3739                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3740                        params,
3741                        has_result: true,
3742                    },
3743                    result_count => CallerInfo::Sync {
3744                        params,
3745                        result_count,
3746                    },
3747                },
3748            )
3749        }
3750    }
3751
3752    unsafe fn sync_start(
3753        &mut self,
3754        instance: Instance,
3755        callback: *mut VMFuncRef,
3756        callee: *mut VMFuncRef,
3757        param_count: u32,
3758        storage: *mut MaybeUninit<ValRaw>,
3759        storage_len: usize,
3760    ) -> Result<()> {
3761        unsafe {
3762            instance
3763                .start_call(
3764                    StoreContextMut(self),
3765                    callback,
3766                    ptr::null_mut(),
3767                    callee,
3768                    param_count,
3769                    1,
3770                    START_FLAG_ASYNC_CALLEE,
3771                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3772                    // this method will have ensured that `storage` is a valid
3773                    // pointer containing at least `storage_len` items.
3774                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3775                )
3776                .map(drop)
3777        }
3778    }
3779
3780    unsafe fn async_start(
3781        &mut self,
3782        instance: Instance,
3783        callback: *mut VMFuncRef,
3784        post_return: *mut VMFuncRef,
3785        callee: *mut VMFuncRef,
3786        param_count: u32,
3787        result_count: u32,
3788        flags: u32,
3789    ) -> Result<u32> {
3790        unsafe {
3791            instance.start_call(
3792                StoreContextMut(self),
3793                callback,
3794                post_return,
3795                callee,
3796                param_count,
3797                result_count,
3798                flags,
3799                None,
3800            )
3801        }
3802    }
3803
3804    fn future_write(
3805        &mut self,
3806        instance: Instance,
3807        caller: RuntimeComponentInstanceIndex,
3808        ty: TypeFutureTableIndex,
3809        options: OptionsIndex,
3810        future: u32,
3811        address: u32,
3812    ) -> Result<u32> {
3813        instance.id().get(self).check_may_leave(caller)?;
3814        instance
3815            .guest_write(
3816                StoreContextMut(self),
3817                caller,
3818                TransmitIndex::Future(ty),
3819                options,
3820                None,
3821                future,
3822                address,
3823                1,
3824            )
3825            .map(|result| result.encode())
3826    }
3827
3828    fn future_read(
3829        &mut self,
3830        instance: Instance,
3831        caller: RuntimeComponentInstanceIndex,
3832        ty: TypeFutureTableIndex,
3833        options: OptionsIndex,
3834        future: u32,
3835        address: u32,
3836    ) -> Result<u32> {
3837        instance.id().get(self).check_may_leave(caller)?;
3838        instance
3839            .guest_read(
3840                StoreContextMut(self),
3841                caller,
3842                TransmitIndex::Future(ty),
3843                options,
3844                None,
3845                future,
3846                address,
3847                1,
3848            )
3849            .map(|result| result.encode())
3850    }
3851
3852    fn stream_write(
3853        &mut self,
3854        instance: Instance,
3855        caller: RuntimeComponentInstanceIndex,
3856        ty: TypeStreamTableIndex,
3857        options: OptionsIndex,
3858        stream: u32,
3859        address: u32,
3860        count: u32,
3861    ) -> Result<u32> {
3862        instance.id().get(self).check_may_leave(caller)?;
3863        instance
3864            .guest_write(
3865                StoreContextMut(self),
3866                caller,
3867                TransmitIndex::Stream(ty),
3868                options,
3869                None,
3870                stream,
3871                address,
3872                count,
3873            )
3874            .map(|result| result.encode())
3875    }
3876
3877    fn stream_read(
3878        &mut self,
3879        instance: Instance,
3880        caller: RuntimeComponentInstanceIndex,
3881        ty: TypeStreamTableIndex,
3882        options: OptionsIndex,
3883        stream: u32,
3884        address: u32,
3885        count: u32,
3886    ) -> Result<u32> {
3887        instance.id().get(self).check_may_leave(caller)?;
3888        instance
3889            .guest_read(
3890                StoreContextMut(self),
3891                caller,
3892                TransmitIndex::Stream(ty),
3893                options,
3894                None,
3895                stream,
3896                address,
3897                count,
3898            )
3899            .map(|result| result.encode())
3900    }
3901
3902    fn future_drop_writable(
3903        &mut self,
3904        instance: Instance,
3905        caller: RuntimeComponentInstanceIndex,
3906        ty: TypeFutureTableIndex,
3907        writer: u32,
3908    ) -> Result<()> {
3909        instance.id().get(self).check_may_leave(caller)?;
3910        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3911    }
3912
3913    fn flat_stream_write(
3914        &mut self,
3915        instance: Instance,
3916        caller: RuntimeComponentInstanceIndex,
3917        ty: TypeStreamTableIndex,
3918        options: OptionsIndex,
3919        payload_size: u32,
3920        payload_align: u32,
3921        stream: u32,
3922        address: u32,
3923        count: u32,
3924    ) -> Result<u32> {
3925        instance.id().get(self).check_may_leave(caller)?;
3926        instance
3927            .guest_write(
3928                StoreContextMut(self),
3929                caller,
3930                TransmitIndex::Stream(ty),
3931                options,
3932                Some(FlatAbi {
3933                    size: payload_size,
3934                    align: payload_align,
3935                }),
3936                stream,
3937                address,
3938                count,
3939            )
3940            .map(|result| result.encode())
3941    }
3942
3943    fn flat_stream_read(
3944        &mut self,
3945        instance: Instance,
3946        caller: RuntimeComponentInstanceIndex,
3947        ty: TypeStreamTableIndex,
3948        options: OptionsIndex,
3949        payload_size: u32,
3950        payload_align: u32,
3951        stream: u32,
3952        address: u32,
3953        count: u32,
3954    ) -> Result<u32> {
3955        instance.id().get(self).check_may_leave(caller)?;
3956        instance
3957            .guest_read(
3958                StoreContextMut(self),
3959                caller,
3960                TransmitIndex::Stream(ty),
3961                options,
3962                Some(FlatAbi {
3963                    size: payload_size,
3964                    align: payload_align,
3965                }),
3966                stream,
3967                address,
3968                count,
3969            )
3970            .map(|result| result.encode())
3971    }
3972
3973    fn stream_drop_writable(
3974        &mut self,
3975        instance: Instance,
3976        caller: RuntimeComponentInstanceIndex,
3977        ty: TypeStreamTableIndex,
3978        writer: u32,
3979    ) -> Result<()> {
3980        instance.id().get(self).check_may_leave(caller)?;
3981        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3982    }
3983
3984    fn error_context_debug_message(
3985        &mut self,
3986        instance: Instance,
3987        caller: RuntimeComponentInstanceIndex,
3988        ty: TypeComponentLocalErrorContextTableIndex,
3989        options: OptionsIndex,
3990        err_ctx_handle: u32,
3991        debug_msg_address: u32,
3992    ) -> Result<()> {
3993        instance.id().get(self).check_may_leave(caller)?;
3994        instance.error_context_debug_message(
3995            StoreContextMut(self),
3996            ty,
3997            options,
3998            err_ctx_handle,
3999            debug_msg_address,
4000        )
4001    }
4002
4003    fn thread_new_indirect(
4004        &mut self,
4005        instance: Instance,
4006        caller: RuntimeComponentInstanceIndex,
4007        func_ty_idx: TypeFuncIndex,
4008        start_func_table_idx: RuntimeTableIndex,
4009        start_func_idx: u32,
4010        context: i32,
4011    ) -> Result<u32> {
4012        instance.thread_new_indirect(
4013            StoreContextMut(self),
4014            caller,
4015            func_ty_idx,
4016            start_func_table_idx,
4017            start_func_idx,
4018            context,
4019        )
4020    }
4021}
4022
4023type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4024
4025/// Represents the state of a pending host task.
4026struct HostTask {
4027    common: WaitableCommon,
4028    caller_instance: RuntimeComponentInstanceIndex,
4029    join_handle: Option<JoinHandle>,
4030}
4031
4032impl HostTask {
4033    fn new(
4034        caller_instance: RuntimeComponentInstanceIndex,
4035        join_handle: Option<JoinHandle>,
4036    ) -> Self {
4037        Self {
4038            common: WaitableCommon::default(),
4039            caller_instance,
4040            join_handle,
4041        }
4042    }
4043}
4044
4045impl TableDebug for HostTask {
4046    fn type_name() -> &'static str {
4047        "HostTask"
4048    }
4049}
4050
4051type CallbackFn = Box<
4052    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4053        + Send
4054        + Sync
4055        + 'static,
4056>;
4057
4058/// Represents the caller of a given guest task.
4059enum Caller {
4060    /// The host called the guest task.
4061    Host {
4062        /// If present, may be used to deliver the result.
4063        tx: Option<oneshot::Sender<LiftedResult>>,
4064        /// Channel to notify once all subtasks spawned by this caller have
4065        /// completed.
4066        ///
4067        /// Note that we'll never actually send anything to this channel;
4068        /// dropping it when the refcount goes to zero is sufficient to notify
4069        /// the receiver.
4070        exit_tx: Arc<oneshot::Sender<()>>,
4071        /// If true, there's a host future that must be dropped before the task
4072        /// can be deleted.
4073        host_future_present: bool,
4074        /// If true, call `post-return` function (if any) automatically.
4075        call_post_return_automatically: bool,
4076    },
4077    /// Another guest thread called the guest task
4078    Guest {
4079        /// The id of the caller
4080        thread: QualifiedThreadId,
4081        /// The instance to use to enforce reentrance rules.
4082        ///
4083        /// Note that this might not be the same as the instance the caller task
4084        /// started executing in given that one or more synchronous guest->guest
4085        /// calls may have occurred involving multiple instances.
4086        instance: RuntimeComponentInstanceIndex,
4087    },
4088}
4089
4090/// Represents a closure and related canonical ABI parameters required to
4091/// validate a `task.return` call at runtime and lift the result.
4092struct LiftResult {
4093    lift: RawLift,
4094    ty: TypeTupleIndex,
4095    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4096    string_encoding: StringEncoding,
4097}
4098
4099/// The table ID for a guest thread, qualified by the task to which it belongs.
4100///
4101/// This exists to minimize table lookups and the necessity to pass stores around mutably
4102/// for the common case of identifying the task to which a thread belongs.
4103#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4104struct QualifiedThreadId {
4105    task: TableId<GuestTask>,
4106    thread: TableId<GuestThread>,
4107}
4108
4109impl QualifiedThreadId {
4110    fn qualify(
4111        state: &mut ConcurrentState,
4112        thread: TableId<GuestThread>,
4113    ) -> Result<QualifiedThreadId> {
4114        Ok(QualifiedThreadId {
4115            task: state.get_mut(thread)?.parent_task,
4116            thread,
4117        })
4118    }
4119}
4120
4121impl fmt::Debug for QualifiedThreadId {
4122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4123        f.debug_tuple("QualifiedThreadId")
4124            .field(&self.task.rep())
4125            .field(&self.thread.rep())
4126            .finish()
4127    }
4128}
4129
4130enum GuestThreadState {
4131    NotStartedImplicit,
4132    NotStartedExplicit(
4133        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4134    ),
4135    Running,
4136    Suspended(StoreFiber<'static>),
4137    Pending,
4138    Completed,
4139}
4140pub struct GuestThread {
4141    /// Context-local state used to implement the `context.{get,set}`
4142    /// intrinsics.
4143    context: [u32; 2],
4144    /// The owning guest task.
4145    parent_task: TableId<GuestTask>,
4146    /// If present, indicates that the thread is currently waiting on the
4147    /// specified set but may be cancelled and woken immediately.
4148    wake_on_cancel: Option<TableId<WaitableSet>>,
4149    /// The execution state of this guest thread
4150    state: GuestThreadState,
4151    /// The index of this thread in the component instance's handle table.
4152    /// This must always be `Some` after initialization.
4153    instance_rep: Option<u32>,
4154}
4155
4156impl GuestThread {
4157    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4158    /// handle.
4159    fn from_instance(
4160        state: Pin<&mut ComponentInstance>,
4161        caller_instance: RuntimeComponentInstanceIndex,
4162        guest_thread: u32,
4163    ) -> Result<TableId<Self>> {
4164        let rep = state.instance_states().0[caller_instance]
4165            .handle_table()
4166            .guest_thread_rep(guest_thread)?;
4167        Ok(TableId::new(rep))
4168    }
4169
4170    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4171        Self {
4172            context: [0; 2],
4173            parent_task,
4174            wake_on_cancel: None,
4175            state: GuestThreadState::NotStartedImplicit,
4176            instance_rep: None,
4177        }
4178    }
4179
4180    fn new_explicit(
4181        parent_task: TableId<GuestTask>,
4182        start_func: Box<
4183            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4184        >,
4185    ) -> Self {
4186        Self {
4187            context: [0; 2],
4188            parent_task,
4189            wake_on_cancel: None,
4190            state: GuestThreadState::NotStartedExplicit(start_func),
4191            instance_rep: None,
4192        }
4193    }
4194}
4195
4196impl TableDebug for GuestThread {
4197    fn type_name() -> &'static str {
4198        "GuestThread"
4199    }
4200}
4201
4202enum SyncResult {
4203    NotProduced,
4204    Produced(Option<ValRaw>),
4205    Taken,
4206}
4207
4208impl SyncResult {
4209    fn take(&mut self) -> Option<Option<ValRaw>> {
4210        match mem::replace(self, SyncResult::Taken) {
4211            SyncResult::NotProduced => None,
4212            SyncResult::Produced(val) => Some(val),
4213            SyncResult::Taken => {
4214                panic!("attempted to take a synchronous result that was already taken")
4215            }
4216        }
4217    }
4218}
4219
4220#[derive(Debug)]
4221enum HostFutureState {
4222    NotApplicable,
4223    Live,
4224    Dropped,
4225}
4226
4227/// Represents a pending guest task.
4228pub(crate) struct GuestTask {
4229    /// See `WaitableCommon`
4230    common: WaitableCommon,
4231    /// Closure to lower the parameters passed to this task.
4232    lower_params: Option<RawLower>,
4233    /// See `LiftResult`
4234    lift_result: Option<LiftResult>,
4235    /// A place to stash the type-erased lifted result if it can't be delivered
4236    /// immediately.
4237    result: Option<LiftedResult>,
4238    /// Closure to call the callback function for an async-lifted export, if
4239    /// provided.
4240    callback: Option<CallbackFn>,
4241    /// See `Caller`
4242    caller: Caller,
4243    /// A place to stash the call context for managing resource borrows while
4244    /// switching between guest tasks.
4245    call_context: Option<CallContext>,
4246    /// A place to stash the lowered result for a sync-to-async call until it
4247    /// can be returned to the caller.
4248    sync_result: SyncResult,
4249    /// Whether or not the task has been cancelled (i.e. whether the task is
4250    /// permitted to call `task.cancel`).
4251    cancel_sent: bool,
4252    /// Whether or not we've sent a `Status::Starting` event to any current or
4253    /// future waiters for this waitable.
4254    starting_sent: bool,
4255    /// Pending guest subtasks created by this task (directly or indirectly).
4256    ///
4257    /// This is used to re-parent subtasks which are still running when their
4258    /// parent task is disposed.
4259    subtasks: HashSet<TableId<GuestTask>>,
4260    /// Scratch waitable set used to watch subtasks during synchronous calls.
4261    sync_call_set: TableId<WaitableSet>,
4262    /// The runtime instance to which the exported function for this guest task
4263    /// belongs.
4264    ///
4265    /// Note that the task may do a sync->sync call via a fused adapter which
4266    /// results in that task executing code in a different instance, and it may
4267    /// call host functions and intrinsics from that other instance.
4268    instance: RuntimeInstance,
4269    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4270    /// delivered to this task.
4271    event: Option<Event>,
4272    /// The `ExportIndex` of the guest function being called, if known.
4273    function_index: Option<ExportIndex>,
4274    /// Whether or not the task has exited.
4275    exited: bool,
4276    /// Threads belonging to this task
4277    threads: HashSet<TableId<GuestThread>>,
4278    /// The state of the host future that represents an async task, which must
4279    /// be dropped before we can delete the task.
4280    host_future_state: HostFutureState,
4281    /// Indicates whether this task was created for a call to an async-lifted
4282    /// export.
4283    async_function: bool,
4284}
4285
4286impl GuestTask {
4287    fn already_lowered_parameters(&self) -> bool {
4288        // We reset `lower_params` after we lower the parameters
4289        self.lower_params.is_none()
4290    }
4291
4292    fn returned_or_cancelled(&self) -> bool {
4293        // We reset `lift_result` after we return or exit
4294        self.lift_result.is_none()
4295    }
4296
4297    fn ready_to_delete(&self) -> bool {
4298        let threads_completed = self.threads.is_empty();
4299        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4300        let pending_completion_event = matches!(
4301            self.common.event,
4302            Some(Event::Subtask {
4303                status: Status::Returned | Status::ReturnCancelled
4304            })
4305        );
4306        let ready = threads_completed
4307            && !has_sync_result
4308            && !pending_completion_event
4309            && !matches!(self.host_future_state, HostFutureState::Live);
4310        log::trace!(
4311            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4312            threads_completed,
4313            has_sync_result,
4314            pending_completion_event,
4315            self.host_future_state
4316        );
4317        ready
4318    }
4319
4320    fn new(
4321        state: &mut ConcurrentState,
4322        lower_params: RawLower,
4323        lift_result: LiftResult,
4324        caller: Caller,
4325        callback: Option<CallbackFn>,
4326        component_instance: Instance,
4327        instance: RuntimeComponentInstanceIndex,
4328        async_function: bool,
4329    ) -> Result<Self> {
4330        let sync_call_set = state.push(WaitableSet::default())?;
4331        let host_future_state = match &caller {
4332            Caller::Guest { .. } => HostFutureState::NotApplicable,
4333            Caller::Host {
4334                host_future_present,
4335                ..
4336            } => {
4337                if *host_future_present {
4338                    HostFutureState::Live
4339                } else {
4340                    HostFutureState::NotApplicable
4341                }
4342            }
4343        };
4344        Ok(Self {
4345            common: WaitableCommon::default(),
4346            lower_params: Some(lower_params),
4347            lift_result: Some(lift_result),
4348            result: None,
4349            callback,
4350            caller,
4351            call_context: Some(CallContext::default()),
4352            sync_result: SyncResult::NotProduced,
4353            cancel_sent: false,
4354            starting_sent: false,
4355            subtasks: HashSet::new(),
4356            sync_call_set,
4357            instance: RuntimeInstance {
4358                instance: component_instance.id().instance(),
4359                index: instance,
4360            },
4361            event: None,
4362            function_index: None,
4363            exited: false,
4364            threads: HashSet::new(),
4365            host_future_state,
4366            async_function,
4367        })
4368    }
4369
4370    /// Dispose of this guest task, reparenting any pending subtasks to the
4371    /// caller.
4372    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4373        // If there are not-yet-delivered completion events for subtasks in
4374        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4375        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4376            if let Some(Event::Subtask {
4377                status: Status::Returned | Status::ReturnCancelled,
4378            }) = waitable.common(state)?.event
4379            {
4380                waitable.delete_from(state)?;
4381            }
4382        }
4383
4384        assert!(self.threads.is_empty());
4385
4386        state.delete(self.sync_call_set)?;
4387
4388        // Reparent any pending subtasks to the caller.
4389        match &self.caller {
4390            Caller::Guest {
4391                thread,
4392                instance: runtime_instance,
4393            } => {
4394                let task_mut = state.get_mut(thread.task)?;
4395                let present = task_mut.subtasks.remove(&me);
4396                assert!(present);
4397
4398                for subtask in &self.subtasks {
4399                    task_mut.subtasks.insert(*subtask);
4400                }
4401
4402                for subtask in &self.subtasks {
4403                    state.get_mut(*subtask)?.caller = Caller::Guest {
4404                        thread: *thread,
4405                        instance: *runtime_instance,
4406                    };
4407                }
4408            }
4409            Caller::Host { exit_tx, .. } => {
4410                for subtask in &self.subtasks {
4411                    state.get_mut(*subtask)?.caller = Caller::Host {
4412                        tx: None,
4413                        // Clone `exit_tx` to ensure that it is only dropped
4414                        // once all transitive subtasks of the host call have
4415                        // exited:
4416                        exit_tx: exit_tx.clone(),
4417                        host_future_present: false,
4418                        call_post_return_automatically: true,
4419                    };
4420                }
4421            }
4422        }
4423
4424        for subtask in self.subtasks {
4425            if state.get_mut(subtask)?.exited {
4426                Waitable::Guest(subtask).delete_from(state)?;
4427            }
4428        }
4429
4430        Ok(())
4431    }
4432
4433    fn call_post_return_automatically(&self) -> bool {
4434        matches!(
4435            self.caller,
4436            Caller::Guest { .. }
4437                | Caller::Host {
4438                    call_post_return_automatically: true,
4439                    ..
4440                }
4441        )
4442    }
4443}
4444
4445impl TableDebug for GuestTask {
4446    fn type_name() -> &'static str {
4447        "GuestTask"
4448    }
4449}
4450
4451/// Represents state common to all kinds of waitables.
4452#[derive(Default)]
4453struct WaitableCommon {
4454    /// The currently pending event for this waitable, if any.
4455    event: Option<Event>,
4456    /// The set to which this waitable belongs, if any.
4457    set: Option<TableId<WaitableSet>>,
4458    /// The handle with which the guest refers to this waitable, if any.
4459    handle: Option<u32>,
4460}
4461
4462/// Represents a Component Model Async `waitable`.
4463#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4464enum Waitable {
4465    /// A host task
4466    Host(TableId<HostTask>),
4467    /// A guest task
4468    Guest(TableId<GuestTask>),
4469    /// The read or write end of a stream or future
4470    Transmit(TableId<TransmitHandle>),
4471}
4472
4473impl Waitable {
4474    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4475    /// handle.
4476    fn from_instance(
4477        state: Pin<&mut ComponentInstance>,
4478        caller_instance: RuntimeComponentInstanceIndex,
4479        waitable: u32,
4480    ) -> Result<Self> {
4481        use crate::runtime::vm::component::Waitable;
4482
4483        let (waitable, kind) = state.instance_states().0[caller_instance]
4484            .handle_table()
4485            .waitable_rep(waitable)?;
4486
4487        Ok(match kind {
4488            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4489            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4490            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4491        })
4492    }
4493
4494    /// Retrieve the host-visible identifier for this `Waitable`.
4495    fn rep(&self) -> u32 {
4496        match self {
4497            Self::Host(id) => id.rep(),
4498            Self::Guest(id) => id.rep(),
4499            Self::Transmit(id) => id.rep(),
4500        }
4501    }
4502
4503    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4504    /// remove it from any set it may currently belong to (when `set` is
4505    /// `None`).
4506    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4507        log::trace!("waitable {self:?} join set {set:?}",);
4508
4509        let old = mem::replace(&mut self.common(state)?.set, set);
4510
4511        if let Some(old) = old {
4512            match *self {
4513                Waitable::Host(id) => state.remove_child(id, old),
4514                Waitable::Guest(id) => state.remove_child(id, old),
4515                Waitable::Transmit(id) => state.remove_child(id, old),
4516            }?;
4517
4518            state.get_mut(old)?.ready.remove(self);
4519        }
4520
4521        if let Some(set) = set {
4522            match *self {
4523                Waitable::Host(id) => state.add_child(id, set),
4524                Waitable::Guest(id) => state.add_child(id, set),
4525                Waitable::Transmit(id) => state.add_child(id, set),
4526            }?;
4527
4528            if self.common(state)?.event.is_some() {
4529                self.mark_ready(state)?;
4530            }
4531        }
4532
4533        Ok(())
4534    }
4535
4536    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4537    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4538        Ok(match self {
4539            Self::Host(id) => &mut state.get_mut(*id)?.common,
4540            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4541            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4542        })
4543    }
4544
4545    /// Set or clear the pending event for this waitable and either deliver it
4546    /// to the first waiter, if any, or mark it as ready to be delivered to the
4547    /// next waiter that arrives.
4548    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4549        log::trace!("set event for {self:?}: {event:?}");
4550        self.common(state)?.event = event;
4551        self.mark_ready(state)
4552    }
4553
4554    /// Take the pending event from this waitable, leaving `None` in its place.
4555    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4556        let common = self.common(state)?;
4557        let event = common.event.take();
4558        if let Some(set) = self.common(state)?.set {
4559            state.get_mut(set)?.ready.remove(self);
4560        }
4561
4562        Ok(event)
4563    }
4564
4565    /// Deliver the current event for this waitable to the first waiter, if any,
4566    /// or else mark it as ready to be delivered to the next waiter that
4567    /// arrives.
4568    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4569        if let Some(set) = self.common(state)?.set {
4570            state.get_mut(set)?.ready.insert(*self);
4571            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4572                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4573                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4574
4575                let item = match mode {
4576                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4577                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4578                        thread,
4579                        kind: GuestCallKind::DeliverEvent {
4580                            instance,
4581                            set: Some(set),
4582                        },
4583                    }),
4584                };
4585                state.push_high_priority(item);
4586            }
4587        }
4588        Ok(())
4589    }
4590
4591    /// Remove this waitable from the instance's rep table.
4592    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4593        match self {
4594            Self::Host(task) => {
4595                log::trace!("delete host task {task:?}");
4596                state.delete(*task)?;
4597            }
4598            Self::Guest(task) => {
4599                log::trace!("delete guest task {task:?}");
4600                state.delete(*task)?.dispose(state, *task)?;
4601            }
4602            Self::Transmit(task) => {
4603                state.delete(*task)?;
4604            }
4605        }
4606
4607        Ok(())
4608    }
4609}
4610
4611impl fmt::Debug for Waitable {
4612    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4613        match self {
4614            Self::Host(id) => write!(f, "{id:?}"),
4615            Self::Guest(id) => write!(f, "{id:?}"),
4616            Self::Transmit(id) => write!(f, "{id:?}"),
4617        }
4618    }
4619}
4620
4621/// Represents a Component Model Async `waitable-set`.
4622#[derive(Default)]
4623struct WaitableSet {
4624    /// Which waitables in this set have pending events, if any.
4625    ready: BTreeSet<Waitable>,
4626    /// Which guest threads are currently waiting on this set, if any.
4627    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4628}
4629
4630impl TableDebug for WaitableSet {
4631    fn type_name() -> &'static str {
4632        "WaitableSet"
4633    }
4634}
4635
4636/// Type-erased closure to lower the parameters for a guest task.
4637type RawLower =
4638    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4639
4640/// Type-erased closure to lift the result for a guest task.
4641type RawLift = Box<
4642    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4643>;
4644
4645/// Type erased result of a guest task which may be downcast to the expected
4646/// type by a host caller (or simply ignored in the case of a guest caller; see
4647/// `DummyResult`).
4648type LiftedResult = Box<dyn Any + Send + Sync>;
4649
4650/// Used to return a result from a `LiftFn` when the actual result has already
4651/// been lowered to a guest task's stack and linear memory.
4652struct DummyResult;
4653
4654/// Represents the Component Model Async state of a (sub-)component instance.
4655#[derive(Default)]
4656pub struct ConcurrentInstanceState {
4657    /// Whether backpressure is set for this instance (enabled if >0)
4658    backpressure: u16,
4659    /// Whether this instance can be entered
4660    do_not_enter: bool,
4661    /// Pending calls for this instance which require `Self::backpressure` to be
4662    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4663    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4664}
4665
4666impl ConcurrentInstanceState {
4667    pub fn pending_is_empty(&self) -> bool {
4668        self.pending.is_empty()
4669    }
4670}
4671
4672/// Represents the Component Model Async state of a store.
4673pub struct ConcurrentState {
4674    /// The currently running guest thread, if any.
4675    guest_thread: Option<QualifiedThreadId>,
4676
4677    /// The set of pending host and background tasks, if any.
4678    ///
4679    /// See `ComponentInstance::poll_until` for where we temporarily take this
4680    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4681    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4682    /// The table of waitables, waitable sets, etc.
4683    table: AlwaysMut<ResourceTable>,
4684    /// The "high priority" work queue for this store's event loop.
4685    high_priority: Vec<WorkItem>,
4686    /// The "low priority" work queue for this store's event loop.
4687    low_priority: Vec<WorkItem>,
4688    /// A place to stash the reason a fiber is suspending so that the code which
4689    /// resumed it will know under what conditions the fiber should be resumed
4690    /// again.
4691    suspend_reason: Option<SuspendReason>,
4692    /// A cached fiber which is waiting for work to do.
4693    ///
4694    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4695    worker: Option<StoreFiber<'static>>,
4696    /// A place to stash the work item for which we're resuming a worker fiber.
4697    worker_item: Option<WorkerItem>,
4698
4699    /// Reference counts for all component error contexts
4700    ///
4701    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4702    /// (sub)component ref counts as tracked by `error_context_tables`, for
4703    /// example when the host holds one or more references to error contexts.
4704    ///
4705    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4706    /// component-wide representation) of the index into concurrent state for a given
4707    /// stored `ErrorContext`.
4708    ///
4709    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4710    /// as a `TableId<ErrorContextState>`.
4711    global_error_context_ref_counts:
4712        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4713}
4714
4715impl Default for ConcurrentState {
4716    fn default() -> Self {
4717        Self {
4718            guest_thread: None,
4719            table: AlwaysMut::new(ResourceTable::new()),
4720            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4721            high_priority: Vec::new(),
4722            low_priority: Vec::new(),
4723            suspend_reason: None,
4724            worker: None,
4725            worker_item: None,
4726            global_error_context_ref_counts: BTreeMap::new(),
4727        }
4728    }
4729}
4730
4731impl ConcurrentState {
4732    /// Take ownership of any fibers and futures owned by this object.
4733    ///
4734    /// This should be used when disposing of the `Store` containing this object
4735    /// in order to gracefully resolve any and all fibers using
4736    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4737    /// use-after-free bugs due to fibers which may still have access to the
4738    /// `Store`.
4739    ///
4740    /// Additionally, the futures collected with this function should be dropped
4741    /// within a `tls::set` call, which will ensure than any futures closing
4742    /// over an `&Accessor` will have access to the store when dropped, allowing
4743    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4744    /// panicking.
4745    ///
4746    /// Note that this will leave the object in an inconsistent and unusable
4747    /// state, so it should only be used just prior to dropping it.
4748    pub(crate) fn take_fibers_and_futures(
4749        &mut self,
4750        fibers: &mut Vec<StoreFiber<'static>>,
4751        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4752    ) {
4753        for entry in self.table.get_mut().iter_mut() {
4754            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4755                for mode in mem::take(&mut set.waiting).into_values() {
4756                    if let WaitMode::Fiber(fiber) = mode {
4757                        fibers.push(fiber);
4758                    }
4759                }
4760            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4761                if let GuestThreadState::Suspended(fiber) =
4762                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4763                {
4764                    fibers.push(fiber);
4765                }
4766            }
4767        }
4768
4769        if let Some(fiber) = self.worker.take() {
4770            fibers.push(fiber);
4771        }
4772
4773        let mut take_items = |list| {
4774            for item in mem::take(list) {
4775                match item {
4776                    WorkItem::ResumeFiber(fiber) => {
4777                        fibers.push(fiber);
4778                    }
4779                    WorkItem::PushFuture(future) => {
4780                        self.futures
4781                            .get_mut()
4782                            .as_mut()
4783                            .unwrap()
4784                            .push(future.into_inner());
4785                    }
4786                    _ => {}
4787                }
4788            }
4789        };
4790
4791        take_items(&mut self.high_priority);
4792        take_items(&mut self.low_priority);
4793
4794        if let Some(them) = self.futures.get_mut().take() {
4795            futures.push(them);
4796        }
4797    }
4798
4799    fn push<V: Send + Sync + 'static>(
4800        &mut self,
4801        value: V,
4802    ) -> Result<TableId<V>, ResourceTableError> {
4803        self.table.get_mut().push(value).map(TableId::from)
4804    }
4805
4806    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4807        self.table.get_mut().get_mut(&Resource::from(id))
4808    }
4809
4810    pub fn add_child<T: 'static, U: 'static>(
4811        &mut self,
4812        child: TableId<T>,
4813        parent: TableId<U>,
4814    ) -> Result<(), ResourceTableError> {
4815        self.table
4816            .get_mut()
4817            .add_child(Resource::from(child), Resource::from(parent))
4818    }
4819
4820    pub fn remove_child<T: 'static, U: 'static>(
4821        &mut self,
4822        child: TableId<T>,
4823        parent: TableId<U>,
4824    ) -> Result<(), ResourceTableError> {
4825        self.table
4826            .get_mut()
4827            .remove_child(Resource::from(child), Resource::from(parent))
4828    }
4829
4830    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4831        self.table.get_mut().delete(Resource::from(id))
4832    }
4833
4834    fn push_future(&mut self, future: HostTaskFuture) {
4835        // Note that we can't directly push to `ConcurrentState::futures` here
4836        // since this may be called from a future that's being polled inside
4837        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4838        // so it has exclusive access while polling it.  Therefore, we push a
4839        // work item to the "high priority" queue, which will actually push to
4840        // `ConcurrentState::futures` later.
4841        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4842    }
4843
4844    fn push_high_priority(&mut self, item: WorkItem) {
4845        log::trace!("push high priority: {item:?}");
4846        self.high_priority.push(item);
4847    }
4848
4849    fn push_low_priority(&mut self, item: WorkItem) {
4850        log::trace!("push low priority: {item:?}");
4851        self.low_priority.push(item);
4852    }
4853
4854    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4855        if high_priority {
4856            self.push_high_priority(item);
4857        } else {
4858            self.push_low_priority(item);
4859        }
4860    }
4861
4862    /// Determine whether the instance associated with the specified guest task
4863    /// may be entered (i.e. is not already on the async call stack).
4864    ///
4865    /// This is an additional check on top of the "may_enter" instance flag;
4866    /// it's needed because async-lifted exports with callback functions must
4867    /// not call their own instances directly or indirectly, and due to the
4868    /// "stackless" nature of callback-enabled guest tasks this may happen even
4869    /// if there are no activation records on the stack (i.e. the "may_enter"
4870    /// field is `true`) for that instance.
4871    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4872        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4873
4874        // Walk the task tree back to the root, looking for potential
4875        // reentrance.
4876        //
4877        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4878        // such that each bit represents and instance which has been entered by
4879        // that task or an ancestor of that task, in which case this would be a
4880        // constant time check.
4881        loop {
4882            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4883                Caller::Host { .. } => break true,
4884                Caller::Guest { thread, instance } => {
4885                    if *instance == guest_instance.index {
4886                        break false;
4887                    } else {
4888                        *thread
4889                    }
4890                }
4891            };
4892            guest_task = next_thread.task;
4893        }
4894    }
4895
4896    /// Implements the `context.get` intrinsic.
4897    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4898        let thread = self.guest_thread.unwrap();
4899        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4900        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4901        Ok(val)
4902    }
4903
4904    /// Implements the `context.set` intrinsic.
4905    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4906        let thread = self.guest_thread.unwrap();
4907        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4908        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4909        Ok(())
4910    }
4911
4912    /// Returns whether there's a pending cancellation on the current guest thread,
4913    /// consuming the event if so.
4914    fn take_pending_cancellation(&mut self) -> bool {
4915        let thread = self.guest_thread.unwrap();
4916        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4917            assert!(matches!(event, Event::Cancelled));
4918            true
4919        } else {
4920            false
4921        }
4922    }
4923
4924    fn check_blocking(&mut self) -> Result<()> {
4925        let task = self.guest_thread.unwrap().task;
4926        self.check_blocking_for(task)
4927    }
4928
4929    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4930        if self.may_block(task) {
4931            Ok(())
4932        } else {
4933            Err(Trap::CannotBlockSyncTask.into())
4934        }
4935    }
4936
4937    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4938        let task = self.get_mut(task).unwrap();
4939        task.async_function || task.returned_or_cancelled()
4940    }
4941}
4942
4943/// Provide a type hint to compiler about the shape of a parameter lower
4944/// closure.
4945fn for_any_lower<
4946    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4947>(
4948    fun: F,
4949) -> F {
4950    fun
4951}
4952
4953/// Provide a type hint to compiler about the shape of a result lift closure.
4954fn for_any_lift<
4955    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4956>(
4957    fun: F,
4958) -> F {
4959    fun
4960}
4961
4962/// Wrap the specified future in a `poll_fn` which asserts that the future is
4963/// only polled from the event loop of the specified `Store`.
4964///
4965/// See `StoreContextMut::run_concurrent` for details.
4966fn checked<F: Future + Send + 'static>(
4967    id: StoreId,
4968    fut: F,
4969) -> impl Future<Output = F::Output> + Send + 'static {
4970    async move {
4971        let mut fut = pin!(fut);
4972        future::poll_fn(move |cx| {
4973            let message = "\
4974                `Future`s which depend on asynchronous component tasks, streams, or \
4975                futures to complete may only be polled from the event loop of the \
4976                store to which they belong.  Please use \
4977                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4978            ";
4979            tls::try_get(|store| {
4980                let matched = match store {
4981                    tls::TryGet::Some(store) => store.id() == id,
4982                    tls::TryGet::Taken | tls::TryGet::None => false,
4983                };
4984
4985                if !matched {
4986                    panic!("{message}")
4987                }
4988            });
4989            fut.as_mut().poll(cx)
4990        })
4991        .await
4992    }
4993}
4994
4995/// Assert that `StoreContextMut::run_concurrent` has not been called from
4996/// within an store's event loop.
4997fn check_recursive_run() {
4998    tls::try_get(|store| {
4999        if !matches!(store, tls::TryGet::None) {
5000            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5001        }
5002    });
5003}
5004
5005fn unpack_callback_code(code: u32) -> (u32, u32) {
5006    (code & 0xF, code >> 4)
5007}
5008
5009/// Helper struct for packaging parameters to be passed to
5010/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5011/// `waitable-set.poll`.
5012struct WaitableCheckParams {
5013    set: TableId<WaitableSet>,
5014    options: OptionsIndex,
5015    payload: u32,
5016}
5017
5018/// Indicates whether `ComponentInstance::waitable_check` is being called for
5019/// `waitable-set.wait` or `waitable-set.poll`.
5020enum WaitableCheck {
5021    Wait,
5022    Poll,
5023}
5024
5025/// Represents a guest task called from the host, prepared using `prepare_call`.
5026pub(crate) struct PreparedCall<R> {
5027    /// The guest export to be called
5028    handle: Func,
5029    /// The guest thread created by `prepare_call`
5030    thread: QualifiedThreadId,
5031    /// The number of lowered core Wasm parameters to pass to the call.
5032    param_count: usize,
5033    /// The `oneshot::Receiver` to which the result of the call will be
5034    /// delivered when it is available.
5035    rx: oneshot::Receiver<LiftedResult>,
5036    /// The `oneshot::Receiver` which will resolve when the task -- and any
5037    /// transitive subtasks -- have all exited.
5038    exit_rx: oneshot::Receiver<()>,
5039    _phantom: PhantomData<R>,
5040}
5041
5042impl<R> PreparedCall<R> {
5043    /// Get a copy of the `TaskId` for this `PreparedCall`.
5044    pub(crate) fn task_id(&self) -> TaskId {
5045        TaskId {
5046            task: self.thread.task,
5047        }
5048    }
5049}
5050
5051/// Represents a task created by `prepare_call`.
5052pub(crate) struct TaskId {
5053    task: TableId<GuestTask>,
5054}
5055
5056impl TaskId {
5057    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5058    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5059    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5060    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5061    /// and delete the task when all threads are done.
5062    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5063        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5064        if !task.already_lowered_parameters() {
5065            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5066        } else {
5067            task.host_future_state = HostFutureState::Dropped;
5068            if task.ready_to_delete() {
5069                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5070            }
5071        }
5072        Ok(())
5073    }
5074}
5075
5076/// Prepare a call to the specified exported Wasm function, providing functions
5077/// for lowering the parameters and lifting the result.
5078///
5079/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5080/// loop, use `queue_call`.
5081pub(crate) fn prepare_call<T, R>(
5082    mut store: StoreContextMut<T>,
5083    handle: Func,
5084    param_count: usize,
5085    host_future_present: bool,
5086    call_post_return_automatically: bool,
5087    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5088    + Send
5089    + Sync
5090    + 'static,
5091    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5092    + Send
5093    + Sync
5094    + 'static,
5095) -> Result<PreparedCall<R>> {
5096    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5097
5098    let instance = handle.instance().id().get(store.0);
5099    let options = &instance.component().env_component().options[options];
5100    let ty = &instance.component().types()[ty];
5101    let async_function = ty.async_;
5102    let task_return_type = ty.results;
5103    let component_instance = raw_options.instance;
5104    let callback = options.callback.map(|i| instance.runtime_callback(i));
5105    let memory = options
5106        .memory()
5107        .map(|i| instance.runtime_memory(i))
5108        .map(SendSyncPtr::new);
5109    let string_encoding = options.string_encoding;
5110    let token = StoreToken::new(store.as_context_mut());
5111    let state = store.0.concurrent_state_mut();
5112
5113    let (tx, rx) = oneshot::channel();
5114    let (exit_tx, exit_rx) = oneshot::channel();
5115
5116    let mut task = GuestTask::new(
5117        state,
5118        Box::new(for_any_lower(move |store, params| {
5119            lower_params(handle, token.as_context_mut(store), params)
5120        })),
5121        LiftResult {
5122            lift: Box::new(for_any_lift(move |store, result| {
5123                lift_result(handle, store, result)
5124            })),
5125            ty: task_return_type,
5126            memory,
5127            string_encoding,
5128        },
5129        Caller::Host {
5130            tx: Some(tx),
5131            exit_tx: Arc::new(exit_tx),
5132            host_future_present,
5133            call_post_return_automatically,
5134        },
5135        callback.map(|callback| {
5136            let callback = SendSyncPtr::new(callback);
5137            let instance = handle.instance();
5138            Box::new(
5139                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5140                    let store = token.as_context_mut(store);
5141                    // SAFETY: Per the contract of `prepare_call`, the callback
5142                    // will remain valid at least as long is this task exists.
5143                    unsafe {
5144                        instance.call_callback(
5145                            store,
5146                            runtime_instance,
5147                            callback,
5148                            event,
5149                            handle,
5150                            call_post_return_automatically,
5151                        )
5152                    }
5153                },
5154            ) as CallbackFn
5155        }),
5156        handle.instance(),
5157        component_instance,
5158        async_function,
5159    )?;
5160    task.function_index = Some(handle.index());
5161
5162    let task = state.push(task)?;
5163    let thread = state.push(GuestThread::new_implicit(task))?;
5164    state.get_mut(task)?.threads.insert(thread);
5165
5166    Ok(PreparedCall {
5167        handle,
5168        thread: QualifiedThreadId { task, thread },
5169        param_count,
5170        rx,
5171        exit_rx,
5172        _phantom: PhantomData,
5173    })
5174}
5175
5176/// Queue a call previously prepared using `prepare_call` to be run as part of
5177/// the associated `ComponentInstance`'s event loop.
5178///
5179/// The returned future will resolve to the result once it is available, but
5180/// must only be polled via the instance's event loop. See
5181/// `StoreContextMut::run_concurrent` for details.
5182pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5183    mut store: StoreContextMut<T>,
5184    prepared: PreparedCall<R>,
5185) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5186    let PreparedCall {
5187        handle,
5188        thread,
5189        param_count,
5190        rx,
5191        exit_rx,
5192        ..
5193    } = prepared;
5194
5195    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5196
5197    Ok(checked(
5198        store.0.id(),
5199        rx.map(move |result| {
5200            result
5201                .map(|v| (*v.downcast().unwrap(), exit_rx))
5202                .map_err(anyhow::Error::from)
5203        }),
5204    ))
5205}
5206
5207/// Queue a call previously prepared using `prepare_call` to be run as part of
5208/// the associated `ComponentInstance`'s event loop.
5209fn queue_call0<T: 'static>(
5210    store: StoreContextMut<T>,
5211    handle: Func,
5212    guest_thread: QualifiedThreadId,
5213    param_count: usize,
5214) -> Result<()> {
5215    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5216    let is_concurrent = raw_options.async_;
5217    let callback = raw_options.callback;
5218    let instance = handle.instance();
5219    let callee = handle.lifted_core_func(store.0);
5220    let post_return = handle.post_return_core_func(store.0);
5221    let callback = callback.map(|i| {
5222        let instance = instance.id().get(store.0);
5223        SendSyncPtr::new(instance.runtime_callback(i))
5224    });
5225
5226    log::trace!("queueing call {guest_thread:?}");
5227
5228    let instance_flags = if callback.is_none() {
5229        None
5230    } else {
5231        Some(flags)
5232    };
5233
5234    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5235    // (with signatures appropriate for this call) and will remain valid as
5236    // long as this instance is valid.
5237    unsafe {
5238        instance.queue_call(
5239            store,
5240            guest_thread,
5241            SendSyncPtr::new(callee),
5242            param_count,
5243            1,
5244            instance_flags,
5245            is_concurrent,
5246            callback,
5247            post_return.map(SendSyncPtr::new),
5248        )
5249    }
5250}