wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::store::StoreComponentInstanceId;
55use crate::component::{
56    ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{
61    CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62};
63use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64use crate::{
65    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
66    bail,
67    error::{Context as _, format_err},
68};
69use error_contexts::GlobalErrorContextRefCount;
70use futures::channel::oneshot;
71use futures::future::{self, Either, FutureExt};
72use futures::stream::{FuturesUnordered, StreamExt};
73use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
74use std::any::Any;
75use std::borrow::ToOwned;
76use std::boxed::Box;
77use std::cell::UnsafeCell;
78use std::collections::{BTreeMap, BTreeSet, HashSet};
79use std::fmt;
80use std::future::Future;
81use std::marker::PhantomData;
82use std::mem::{self, ManuallyDrop, MaybeUninit};
83use std::ops::DerefMut;
84use std::pin::{Pin, pin};
85use std::ptr::{self, NonNull};
86use std::slice;
87use std::sync::Arc;
88use std::task::{Context, Poll, Waker};
89use std::vec::Vec;
90use table::{TableDebug, TableId};
91use wasmtime_environ::Trap;
92use wasmtime_environ::component::{
93    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
94    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
95    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
96    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
97    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
98};
99
100pub use abort::JoinHandle;
101pub use future_stream_any::{FutureAny, StreamAny};
102pub use futures_and_streams::{
103    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
104    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
105    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
106};
107pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
108
109mod abort;
110mod error_contexts;
111mod future_stream_any;
112mod futures_and_streams;
113pub(crate) mod table;
114pub(crate) mod tls;
115
116/// Constant defined in the Component Model spec to indicate that the async
117/// intrinsic (e.g. `future.write`) has not yet completed.
118const BLOCKED: u32 = 0xffff_ffff;
119
120/// Corresponds to `CallState` in the upstream spec.
121#[derive(Clone, Copy, Eq, PartialEq, Debug)]
122pub enum Status {
123    Starting = 0,
124    Started = 1,
125    Returned = 2,
126    StartCancelled = 3,
127    ReturnCancelled = 4,
128}
129
130impl Status {
131    /// Packs this status and the optional `waitable` provided into a 32-bit
132    /// result that the canonical ABI requires.
133    ///
134    /// The low 4 bits are reserved for the status while the upper 28 bits are
135    /// the waitable, if present.
136    pub fn pack(self, waitable: Option<u32>) -> u32 {
137        assert!(matches!(self, Status::Returned) == waitable.is_none());
138        let waitable = waitable.unwrap_or(0);
139        assert!(waitable < (1 << 28));
140        (waitable << 4) | (self as u32)
141    }
142}
143
144/// Corresponds to `EventCode` in the Component Model spec, plus related payload
145/// data.
146#[derive(Clone, Copy, Debug)]
147enum Event {
148    None,
149    Cancelled,
150    Subtask {
151        status: Status,
152    },
153    StreamRead {
154        code: ReturnCode,
155        pending: Option<(TypeStreamTableIndex, u32)>,
156    },
157    StreamWrite {
158        code: ReturnCode,
159        pending: Option<(TypeStreamTableIndex, u32)>,
160    },
161    FutureRead {
162        code: ReturnCode,
163        pending: Option<(TypeFutureTableIndex, u32)>,
164    },
165    FutureWrite {
166        code: ReturnCode,
167        pending: Option<(TypeFutureTableIndex, u32)>,
168    },
169}
170
171impl Event {
172    /// Lower this event to core Wasm integers for delivery to the guest.
173    ///
174    /// Note that the waitable handle, if any, is assumed to be lowered
175    /// separately.
176    fn parts(self) -> (u32, u32) {
177        const EVENT_NONE: u32 = 0;
178        const EVENT_SUBTASK: u32 = 1;
179        const EVENT_STREAM_READ: u32 = 2;
180        const EVENT_STREAM_WRITE: u32 = 3;
181        const EVENT_FUTURE_READ: u32 = 4;
182        const EVENT_FUTURE_WRITE: u32 = 5;
183        const EVENT_CANCELLED: u32 = 6;
184        match self {
185            Event::None => (EVENT_NONE, 0),
186            Event::Cancelled => (EVENT_CANCELLED, 0),
187            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
188            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
189            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
190            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
191            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
192        }
193    }
194}
195
196/// Corresponds to `CallbackCode` in the spec.
197mod callback_code {
198    pub const EXIT: u32 = 0;
199    pub const YIELD: u32 = 1;
200    pub const WAIT: u32 = 2;
201}
202
203/// A flag indicating that the callee is an async-lowered export.
204///
205/// This may be passed to the `async-start` intrinsic from a fused adapter.
206const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
207
208/// Provides access to either store data (via the `get` method) or the store
209/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
210/// instance to which the current host task belongs.
211///
212/// See [`Accessor::with`] for details.
213pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
214    store: StoreContextMut<'a, T>,
215    get_data: fn(&mut T) -> D::Data<'_>,
216}
217
218impl<'a, T, D> Access<'a, T, D>
219where
220    D: HasData + ?Sized,
221    T: 'static,
222{
223    /// Creates a new [`Access`] from its component parts.
224    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
225        Self { store, get_data }
226    }
227
228    /// Get mutable access to the store data.
229    pub fn data_mut(&mut self) -> &mut T {
230        self.store.data_mut()
231    }
232
233    /// Get mutable access to the store data.
234    pub fn get(&mut self) -> D::Data<'_> {
235        (self.get_data)(self.data_mut())
236    }
237
238    /// Spawn a background task.
239    ///
240    /// See [`Accessor::spawn`] for details.
241    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
242    where
243        T: 'static,
244    {
245        let accessor = Accessor {
246            get_data: self.get_data,
247            token: StoreToken::new(self.store.as_context_mut()),
248        };
249        self.store
250            .as_context_mut()
251            .spawn_with_accessor(accessor, task)
252    }
253
254    /// Returns the getter this accessor is using to project from `T` into
255    /// `D::Data`.
256    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
257        self.get_data
258    }
259}
260
261impl<'a, T, D> AsContext for Access<'a, T, D>
262where
263    D: HasData + ?Sized,
264    T: 'static,
265{
266    type Data = T;
267
268    fn as_context(&self) -> StoreContext<'_, T> {
269        self.store.as_context()
270    }
271}
272
273impl<'a, T, D> AsContextMut for Access<'a, T, D>
274where
275    D: HasData + ?Sized,
276    T: 'static,
277{
278    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
279        self.store.as_context_mut()
280    }
281}
282
283/// Provides scoped mutable access to store data in the context of a concurrent
284/// host task future.
285///
286/// This allows multiple host task futures to execute concurrently and access
287/// the store between (but not across) `await` points.
288///
289/// # Rationale
290///
291/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
292/// `D::Data<'_>`. The problem this is solving, however, is that it does not
293/// literally store these values. The basic problem is that when a concurrent
294/// host future is being polled it has access to `&mut T` (and the whole
295/// `Store`) but when it's not being polled it does not have access to these
296/// values. This reflects how the store is only ever polling one future at a
297/// time so the store is effectively being passed between futures.
298///
299/// Rust's `Future` trait, however, has no means of passing a `Store`
300/// temporarily between futures. The [`Context`](std::task::Context) type does
301/// not have the ability to attach arbitrary information to it at this time.
302/// This type, [`Accessor`], is used to bridge this expressivity gap.
303///
304/// The [`Accessor`] type here represents the ability to acquire, temporarily in
305/// a synchronous manner, the current store. The [`Accessor::with`] function
306/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
307/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
308/// not take an `async` closure as its argument, instead it's a synchronous
309/// closure which must complete during on run of `Future::poll`. This reflects
310/// how the store is temporarily made available while a host future is being
311/// polled.
312///
313/// # Implementation
314///
315/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
316/// this type additionally doesn't even have a lifetime parameter. This is
317/// instead a representation of proof of the ability to acquire these while a
318/// future is being polled. Wasmtime will, when it polls a host future,
319/// configure ambient state such that the `Accessor` that a future closes over
320/// will work and be able to access the store.
321///
322/// This has a number of implications for users such as:
323///
324/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
325///   the lifetime of a single future.
326/// * A future is expected to, however, close over an `Accessor` and keep it
327///   alive probably for the duration of the entire future.
328/// * Different host futures will be given different `Accessor`s, and that's
329///   intentional.
330/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
331///   alleviates some otherwise required bounds to be written down.
332///
333/// # Using `Accessor` in `Drop`
334///
335/// The methods on `Accessor` are only expected to work in the context of
336/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
337/// host future can be dropped at any time throughout the system and Wasmtime
338/// store context is not necessarily available at that time. It's recommended to
339/// not use `Accessor` methods in anything connected to a `Drop` implementation
340/// as they will panic and have unintended results. If you run into this though
341/// feel free to file an issue on the Wasmtime repository.
342pub struct Accessor<T: 'static, D = HasSelf<T>>
343where
344    D: HasData + ?Sized,
345{
346    token: StoreToken<T>,
347    get_data: fn(&mut T) -> D::Data<'_>,
348}
349
350/// A helper trait to take any type of accessor-with-data in functions.
351///
352/// This trait is similar to [`AsContextMut`] except that it's used when
353/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
354/// [`Accessor`] is the main type used in concurrent settings and is passed to
355/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
356///
357/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
358/// this trait. This effectively means that regardless of the `D` in
359/// `Accessor<T, D>` it can still be passed to a function which just needs a
360/// store accessor.
361///
362/// Acquiring an [`Accessor`] can be done through
363/// [`StoreContextMut::run_concurrent`] for example or in a host function
364/// through
365/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
366pub trait AsAccessor {
367    /// The `T` in `Store<T>` that this accessor refers to.
368    type Data: 'static;
369
370    /// The `D` in `Accessor<T, D>`, or the projection out of
371    /// `Self::Data`.
372    type AccessorData: HasData + ?Sized;
373
374    /// Returns the accessor that this is referring to.
375    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
376}
377
378impl<T: AsAccessor + ?Sized> AsAccessor for &T {
379    type Data = T::Data;
380    type AccessorData = T::AccessorData;
381
382    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
383        T::as_accessor(self)
384    }
385}
386
387impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
388    type Data = T;
389    type AccessorData = D;
390
391    fn as_accessor(&self) -> &Accessor<T, D> {
392        self
393    }
394}
395
396// Note that it is intentional at this time that `Accessor` does not actually
397// store `&mut T` or anything similar. This distinctly enables the `Accessor`
398// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
399// that matter). This is used to ergonomically simplify bindings where the
400// majority of the time `Accessor` is closed over in a future which then needs
401// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
402// you already have to write `T: 'static`...) it helps to avoid this.
403//
404// Note as well that `Accessor` doesn't actually store its data at all. Instead
405// it's more of a "proof" of what can be accessed from TLS. API design around
406// `Accessor` and functions like `Linker::func_wrap_concurrent` are
407// intentionally made to ensure that `Accessor` is ideally only used in the
408// context that TLS variables are actually set. For example host functions are
409// given `&Accessor`, not `Accessor`, and this prevents them from persisting
410// the value outside of a future. Within the future the TLS variables are all
411// guaranteed to be set while the future is being polled.
412//
413// Finally though this is not an ironclad guarantee, but nor does it need to be.
414// The TLS APIs are designed to panic or otherwise model usage where they're
415// called recursively or similar. It's hoped that code cannot be constructed to
416// actually hit this at runtime but this is not a safety requirement at this
417// time.
418const _: () = {
419    const fn assert<T: Send + Sync>() {}
420    assert::<Accessor<UnsafeCell<u32>>>();
421};
422
423impl<T> Accessor<T> {
424    /// Creates a new `Accessor` backed by the specified functions.
425    ///
426    /// - `get`: used to retrieve the store
427    ///
428    /// - `get_data`: used to "project" from the store's associated data to
429    /// another type (e.g. a field of that data or a wrapper around it).
430    ///
431    /// - `spawn`: used to queue spawned background tasks to be run later
432    pub(crate) fn new(token: StoreToken<T>) -> Self {
433        Self {
434            token,
435            get_data: |x| x,
436        }
437    }
438}
439
440impl<T, D> Accessor<T, D>
441where
442    D: HasData + ?Sized,
443{
444    /// Run the specified closure, passing it mutable access to the store.
445    ///
446    /// This function is one of the main building blocks of the [`Accessor`]
447    /// type. This yields synchronous, blocking, access to store via an
448    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
449    /// providing the ability to access `D` via [`Access::get`]. Note that the
450    /// `fun` here is given only temporary access to the store and `T`/`D`
451    /// meaning that the return value `R` here is not allowed to capture borrows
452    /// into the two. If access is needed to data within `T` or `D` outside of
453    /// this closure then it must be `clone`d out, for example.
454    ///
455    /// # Panics
456    ///
457    /// This function will panic if it is call recursively with any other
458    /// accessor already in scope. For example if `with` is called within `fun`,
459    /// then this function will panic. It is up to the embedder to ensure that
460    /// this does not happen.
461    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
462        tls::get(|vmstore| {
463            fun(Access {
464                store: self.token.as_context_mut(vmstore),
465                get_data: self.get_data,
466            })
467        })
468    }
469
470    /// Returns the getter this accessor is using to project from `T` into
471    /// `D::Data`.
472    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
473        self.get_data
474    }
475
476    /// Changes this accessor to access `D2` instead of the current type
477    /// parameter `D`.
478    ///
479    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
480    ///
481    /// # Panics
482    ///
483    /// When using this API the returned value is disconnected from `&self` and
484    /// the lifetime binding the `self` argument. An `Accessor` only works
485    /// within the context of the closure or async closure that it was
486    /// originally given to, however. This means that due to the fact that the
487    /// returned value has no lifetime connection it's possible to use the
488    /// accessor outside of `&self`, the original accessor, and panic.
489    ///
490    /// The returned value should only be used within the scope of the original
491    /// `Accessor` that `self` refers to.
492    pub fn with_getter<D2: HasData>(
493        &self,
494        get_data: fn(&mut T) -> D2::Data<'_>,
495    ) -> Accessor<T, D2> {
496        Accessor {
497            token: self.token,
498            get_data,
499        }
500    }
501
502    /// Spawn a background task which will receive an `&Accessor<T, D>` and
503    /// run concurrently with any other tasks in progress for the current
504    /// store.
505    ///
506    /// This is particularly useful for host functions which return a `stream`
507    /// or `future` such that the code to write to the write end of that
508    /// `stream` or `future` must run after the function returns.
509    ///
510    /// The returned [`JoinHandle`] may be used to cancel the task.
511    ///
512    /// # Panics
513    ///
514    /// Panics if called within a closure provided to the [`Accessor::with`]
515    /// function. This can only be called outside an active invocation of
516    /// [`Accessor::with`].
517    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
518    where
519        T: 'static,
520    {
521        let accessor = self.clone_for_spawn();
522        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
523    }
524
525    fn clone_for_spawn(&self) -> Self {
526        Self {
527            token: self.token,
528            get_data: self.get_data,
529        }
530    }
531}
532
533/// Represents a task which may be provided to `Accessor::spawn`,
534/// `Accessor::forward`, or `StorecContextMut::spawn`.
535// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
536// option.
537//
538// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
539// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
540// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
541// Send + Sync + 'static` fails with a type mismatch error when we try to pass
542// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
543// best we can do for the time being.
544pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
545where
546    D: HasData + ?Sized,
547{
548    /// Run the task.
549    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
550}
551
552/// Represents parameter and result metadata for the caller side of a
553/// guest->guest call orchestrated by a fused adapter.
554enum CallerInfo {
555    /// Metadata for a call to an async-lowered import
556    Async {
557        params: Vec<ValRaw>,
558        has_result: bool,
559    },
560    /// Metadata for a call to an sync-lowered import
561    Sync {
562        params: Vec<ValRaw>,
563        result_count: u32,
564    },
565}
566
567/// Indicates how a guest task is waiting on a waitable set.
568enum WaitMode {
569    /// The guest task is waiting using `task.wait`
570    Fiber(StoreFiber<'static>),
571    /// The guest task is waiting via a callback declared as part of an
572    /// async-lifted export.
573    Callback(Instance),
574}
575
576/// Represents the reason a fiber is suspending itself.
577#[derive(Debug)]
578enum SuspendReason {
579    /// The fiber is waiting for an event to be delivered to the specified
580    /// waitable set or task.
581    Waiting {
582        set: TableId<WaitableSet>,
583        thread: QualifiedThreadId,
584        skip_may_block_check: bool,
585    },
586    /// The fiber has finished handling its most recent work item and is waiting
587    /// for another (or to be dropped if it is no longer needed).
588    NeedWork,
589    /// The fiber is yielding and should be resumed once other tasks have had a
590    /// chance to run.
591    Yielding {
592        thread: QualifiedThreadId,
593        skip_may_block_check: bool,
594    },
595    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
596    ExplicitlySuspending {
597        thread: QualifiedThreadId,
598        skip_may_block_check: bool,
599    },
600}
601
602/// Represents a pending call into guest code for a given guest task.
603enum GuestCallKind {
604    /// Indicates there's an event to deliver to the task, possibly related to a
605    /// waitable set the task has been waiting on or polling.
606    DeliverEvent {
607        /// The instance to which the task belongs.
608        instance: Instance,
609        /// The waitable set the event belongs to, if any.
610        ///
611        /// If this is `None` the event will be waiting in the
612        /// `GuestTask::event` field for the task.
613        set: Option<TableId<WaitableSet>>,
614    },
615    /// Indicates that a new guest task call is pending and may be executed
616    /// using the specified closure.
617    ///
618    /// If the closure returns `Ok(Some(call))`, the `call` should be run
619    /// immediately using `handle_guest_call`.
620    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
621    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
622}
623
624impl fmt::Debug for GuestCallKind {
625    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
626        match self {
627            Self::DeliverEvent { instance, set } => f
628                .debug_struct("DeliverEvent")
629                .field("instance", instance)
630                .field("set", set)
631                .finish(),
632            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
633            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
634        }
635    }
636}
637
638/// Represents a pending call into guest code for a given guest thread.
639#[derive(Debug)]
640struct GuestCall {
641    thread: QualifiedThreadId,
642    kind: GuestCallKind,
643}
644
645impl GuestCall {
646    /// Returns whether or not the call is ready to run.
647    ///
648    /// A call will not be ready to run if either:
649    ///
650    /// - the (sub-)component instance to be called has already been entered and
651    /// cannot be reentered until an in-progress call completes
652    ///
653    /// - the call is for a not-yet started task and the (sub-)component
654    /// instance to be called has backpressure enabled
655    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
656        let instance = store
657            .concurrent_state_mut()
658            .get_mut(self.thread.task)?
659            .instance;
660        let state = store.instance_state(instance);
661
662        let ready = match &self.kind {
663            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
664            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
665            GuestCallKind::StartExplicit(_) => true,
666        };
667        log::trace!(
668            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
669            state.do_not_enter,
670            state.backpressure
671        );
672        Ok(ready)
673    }
674}
675
676/// Job to be run on a worker fiber.
677enum WorkerItem {
678    GuestCall(GuestCall),
679    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
680}
681
682/// Represents a pending work item to be handled by the event loop for a given
683/// component instance.
684enum WorkItem {
685    /// A host task to be pushed to `ConcurrentState::futures`.
686    PushFuture(AlwaysMut<HostTaskFuture>),
687    /// A fiber to resume.
688    ResumeFiber(StoreFiber<'static>),
689    /// A pending call into guest code for a given guest task.
690    GuestCall(GuestCall),
691    /// A job to run on a worker fiber.
692    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695impl fmt::Debug for WorkItem {
696    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
697        match self {
698            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
699            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
700            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
701            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
702        }
703    }
704}
705
706/// Whether a suspension intrinsic was cancelled or completed
707#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
708pub(crate) enum WaitResult {
709    Cancelled,
710    Completed,
711}
712
713/// Poll the specified future until it completes on behalf of a guest->host call
714/// using a sync-lowered import.
715///
716/// This is similar to `Instance::first_poll` except it's for sync-lowered
717/// imports, meaning we don't need to handle cancellation and we can block the
718/// caller until the task completes, at which point the caller can handle
719/// lowering the result to the guest's stack and linear memory.
720pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
721    store: &mut dyn VMStore,
722    future: impl Future<Output = Result<R>> + Send + 'static,
723    caller_instance: RuntimeComponentInstanceIndex,
724) -> Result<R> {
725    let state = store.concurrent_state_mut();
726
727    // If there is no current guest thread set, that means the host function was
728    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
729    // should complete immediately.
730    let Some(caller) = state.guest_thread else {
731        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
732            Poll::Ready(result) => result,
733            Poll::Pending => {
734                unreachable!()
735            }
736        };
737    };
738
739    // Save any existing result stashed in `GuestTask::result` so we can replace
740    // it with the new result.
741    let old_result = state
742        .get_mut(caller.task)
743        .with_context(|| format!("bad handle: {caller:?}"))?
744        .result
745        .take();
746
747    // Add a temporary host task into the table so we can track its progress.
748    // Note that we'll never allocate a waitable handle for the guest since
749    // we're being called synchronously.
750    let task = state.push(HostTask::new(caller_instance, None))?;
751
752    log::trace!("new host task child of {caller:?}: {task:?}");
753
754    // Wrap the future in a closure which will take care of stashing the result
755    // in `GuestTask::result` and resuming this fiber when the host task
756    // completes.
757    let mut future = Box::pin(async move {
758        let result = future.await?;
759        tls::get(move |store| {
760            let state = store.concurrent_state_mut();
761            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
762
763            Waitable::Host(task).set_event(
764                state,
765                Some(Event::Subtask {
766                    status: Status::Returned,
767                }),
768            )?;
769
770            Ok(())
771        })
772    }) as HostTaskFuture;
773
774    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
775    // add the future to `ConcurrentState::futures` and poll it automatically
776    // from the event loop if it doesn't complete immediately here.
777    let poll = tls::set(store, || {
778        future
779            .as_mut()
780            .poll(&mut Context::from_waker(&Waker::noop()))
781    });
782
783    match poll {
784        Poll::Ready(result) => {
785            // It completed immediately; check the result and delete the task.
786            result?;
787            log::trace!("delete host task {task:?} (already ready)");
788            store.concurrent_state_mut().delete(task)?;
789        }
790        Poll::Pending => {
791            // It did not complete immediately; add it to
792            // `ConcurrentState::futures` so it will be polled via the event
793            // loop; then use `GuestTask::sync_call_set` to wait for the task to
794            // complete, suspending the current fiber until it does so.
795            let state = store.concurrent_state_mut();
796            state.push_future(future);
797
798            let set = state.get_mut(caller.task)?.sync_call_set;
799            Waitable::Host(task).join(state, Some(set))?;
800
801            store.suspend(SuspendReason::Waiting {
802                set,
803                thread: caller,
804                skip_may_block_check: false,
805            })?;
806        }
807    }
808
809    // Retrieve and return the result.
810    Ok(*mem::replace(
811        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
812        old_result,
813    )
814    .unwrap()
815    .downcast()
816    .unwrap())
817}
818
819/// Execute the specified guest call.
820fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821    let mut next = Some(call);
822    while let Some(call) = next.take() {
823        match call.kind {
824            GuestCallKind::DeliverEvent { instance, set } => {
825                let (event, waitable) = instance
826                    .get_event(store, call.thread.task, set, true)?
827                    .unwrap();
828                let state = store.concurrent_state_mut();
829                let task = state.get_mut(call.thread.task)?;
830                let runtime_instance = task.instance;
831                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833                log::trace!(
834                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835                    call.thread,
836                );
837
838                let old_thread = store.set_thread(Some(call.thread));
839                log::trace!(
840                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841                    call.thread
842                );
843
844                store.maybe_push_call_context(call.thread.task)?;
845
846                store.enter_instance(runtime_instance);
847
848                let callback = store
849                    .concurrent_state_mut()
850                    .get_mut(call.thread.task)?
851                    .callback
852                    .take()
853                    .unwrap();
854
855                let code = callback(store, runtime_instance.index, event, handle)?;
856
857                store
858                    .concurrent_state_mut()
859                    .get_mut(call.thread.task)?
860                    .callback = Some(callback);
861
862                store.exit_instance(runtime_instance)?;
863
864                store.maybe_pop_call_context(call.thread.task)?;
865
866                store.set_thread(old_thread);
867
868                next = instance.handle_callback_code(
869                    store,
870                    call.thread,
871                    runtime_instance.index,
872                    code,
873                )?;
874
875                log::trace!(
876                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
877                );
878            }
879            GuestCallKind::StartImplicit(fun) => {
880                next = fun(store)?;
881            }
882            GuestCallKind::StartExplicit(fun) => {
883                fun(store)?;
884            }
885        }
886    }
887
888    Ok(())
889}
890
891impl<T> Store<T> {
892    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
893    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
894    where
895        T: Send + 'static,
896    {
897        self.as_context_mut().run_concurrent(fun).await
898    }
899
900    #[doc(hidden)]
901    pub fn assert_concurrent_state_empty(&mut self) {
902        self.as_context_mut().assert_concurrent_state_empty();
903    }
904
905    /// Convenience wrapper for [`StoreContextMut::spawn`].
906    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
907    where
908        T: 'static,
909    {
910        self.as_context_mut().spawn(task)
911    }
912}
913
914impl<T> StoreContextMut<'_, T> {
915    /// Assert that all the relevant tables and queues in the concurrent state
916    /// for this store are empty.
917    ///
918    /// This is for sanity checking in integration tests
919    /// (e.g. `component-async-tests`) that the relevant state has been cleared
920    /// after each test concludes.  This should help us catch leaks, e.g. guest
921    /// tasks which haven't been deleted despite having completed and having
922    /// been dropped by their supertasks.
923    #[doc(hidden)]
924    pub fn assert_concurrent_state_empty(self) {
925        let store = self.0;
926        store
927            .store_data_mut()
928            .components
929            .assert_instance_states_empty();
930        let state = store.concurrent_state_mut();
931        assert!(
932            state.table.get_mut().is_empty(),
933            "non-empty table: {:?}",
934            state.table.get_mut()
935        );
936        assert!(state.high_priority.is_empty());
937        assert!(state.low_priority.is_empty());
938        assert!(state.guest_thread.is_none());
939        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
940        assert!(state.global_error_context_ref_counts.is_empty());
941    }
942
943    /// Spawn a background task to run as part of this instance's event loop.
944    ///
945    /// The task will receive an `&Accessor<U>` and run concurrently with
946    /// any other tasks in progress for the instance.
947    ///
948    /// Note that the task will only make progress if and when the event loop
949    /// for this instance is run.
950    ///
951    /// The returned [`SpawnHandle`] may be used to cancel the task.
952    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
953    where
954        T: 'static,
955    {
956        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
957        self.spawn_with_accessor(accessor, task)
958    }
959
960    /// Internal implementation of `spawn` functions where a `store` is
961    /// available along with an `Accessor`.
962    fn spawn_with_accessor<D>(
963        self,
964        accessor: Accessor<T, D>,
965        task: impl AccessorTask<T, D>,
966    ) -> JoinHandle
967    where
968        T: 'static,
969        D: HasData + ?Sized,
970    {
971        // Create an "abortable future" here where internally the future will
972        // hook calls to poll and possibly spawn more background tasks on each
973        // iteration.
974        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
975        self.0
976            .concurrent_state_mut()
977            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
978        handle
979    }
980
981    /// Run the specified closure `fun` to completion as part of this store's
982    /// event loop.
983    ///
984    /// This will run `fun` as part of this store's event loop until it
985    /// yields a result.  `fun` is provided an [`Accessor`], which provides
986    /// controlled access to the store and its data.
987    ///
988    /// This function can be used to invoke [`Func::call_concurrent`] for
989    /// example within the async closure provided here.
990    ///
991    /// # Store-blocking behavior
992    ///
993    ///
994    ///
995    /// At this time there are certain situations in which the `Future` returned
996    /// by the `AsyncFnOnce` passed to this function will not be polled for an
997    /// extended period of time, despite one or more `Waker::wake` events having
998    /// occurred for the task to which it belongs.  This can manifest as the
999    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1000    /// the `Store` being held by e.g. a blocking host function, preventing the
1001    /// `Future` from being polled. A canonical example of this is when the
1002    /// `fun` provided to this function attempts to set a timeout for an
1003    /// invocation of a wasm function. In this situation the async closure is
1004    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1005    /// to elapse. At this time this setup will not always work and the timeout
1006    /// may not reliably fire.
1007    ///
1008    /// This function will not block the current thread and as such is always
1009    /// suitable to run in an `async` context, but the current implementation of
1010    /// Wasmtime can lead to situations where a certain wasm computation is
1011    /// required to make progress the closure to make progress. This is an
1012    /// artifact of Wasmtime's historical implementation of `async` functions
1013    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1014    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1015    /// progress for a readiness notification of (b) to get delivered.
1016    ///
1017    /// This effectively means that it's not possible to reliably perform a
1018    /// "select" operation within the `fun` closure, which timeouts for example
1019    /// are based on. Fixing this requires some relatively major refactoring
1020    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1021    /// that is intended to be fixed one day. In the meantime it's recommended
1022    /// to apply timeouts or such to the entire `run_concurrent` call itself
1023    /// rather than internally.
1024    ///
1025    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1026    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1027    ///
1028    /// # Example
1029    ///
1030    /// ```
1031    /// # use {
1032    /// #   wasmtime::{
1033    /// #     error::{Result},
1034    /// #     component::{ Component, Linker, Resource, ResourceTable},
1035    /// #     Config, Engine, Store
1036    /// #   },
1037    /// # };
1038    /// #
1039    /// # struct MyResource(u32);
1040    /// # struct Ctx { table: ResourceTable }
1041    /// #
1042    /// # async fn foo() -> Result<()> {
1043    /// # let mut config = Config::new();
1044    /// # let engine = Engine::new(&config)?;
1045    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1046    /// # let mut linker = Linker::new(&engine);
1047    /// # let component = Component::new(&engine, "")?;
1048    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1049    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1050    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1051    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1052    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1053    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1054    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1055    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1056    ///    Ok(())
1057    /// }).await??;
1058    /// # Ok(())
1059    /// # }
1060    /// ```
1061    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1062    where
1063        T: Send + 'static,
1064    {
1065        self.do_run_concurrent(fun, false).await
1066    }
1067
1068    pub(super) async fn run_concurrent_trap_on_idle<R>(
1069        self,
1070        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1071    ) -> Result<R>
1072    where
1073        T: Send + 'static,
1074    {
1075        self.do_run_concurrent(fun, true).await
1076    }
1077
1078    async fn do_run_concurrent<R>(
1079        mut self,
1080        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1081        trap_on_idle: bool,
1082    ) -> Result<R>
1083    where
1084        T: Send + 'static,
1085    {
1086        check_recursive_run();
1087        let token = StoreToken::new(self.as_context_mut());
1088
1089        struct Dropper<'a, T: 'static, V> {
1090            store: StoreContextMut<'a, T>,
1091            value: ManuallyDrop<V>,
1092        }
1093
1094        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1095            fn drop(&mut self) {
1096                tls::set(self.store.0, || {
1097                    // SAFETY: Here we drop the value without moving it for the
1098                    // first and only time -- per the contract for `Drop::drop`,
1099                    // this code won't run again, and the `value` field will no
1100                    // longer be accessible.
1101                    unsafe { ManuallyDrop::drop(&mut self.value) }
1102                });
1103            }
1104        }
1105
1106        let accessor = &Accessor::new(token);
1107        let dropper = &mut Dropper {
1108            store: self,
1109            value: ManuallyDrop::new(fun(accessor)),
1110        };
1111        // SAFETY: We never move `dropper` nor its `value` field.
1112        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1113
1114        dropper
1115            .store
1116            .as_context_mut()
1117            .poll_until(future, trap_on_idle)
1118            .await
1119    }
1120
1121    /// Run this store's event loop.
1122    ///
1123    /// The returned future will resolve when the specified future completes or,
1124    /// if `trap_on_idle` is true, when the event loop can't make further
1125    /// progress.
1126    async fn poll_until<R>(
1127        mut self,
1128        mut future: Pin<&mut impl Future<Output = R>>,
1129        trap_on_idle: bool,
1130    ) -> Result<R>
1131    where
1132        T: Send + 'static,
1133    {
1134        struct Reset<'a, T: 'static> {
1135            store: StoreContextMut<'a, T>,
1136            futures: Option<FuturesUnordered<HostTaskFuture>>,
1137        }
1138
1139        impl<'a, T> Drop for Reset<'a, T> {
1140            fn drop(&mut self) {
1141                if let Some(futures) = self.futures.take() {
1142                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1143                }
1144            }
1145        }
1146
1147        loop {
1148            // Take `ConcurrentState::futures` out of the store so we can poll
1149            // it while also safely giving any of the futures inside access to
1150            // `self`.
1151            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1152            let mut reset = Reset {
1153                store: self.as_context_mut(),
1154                futures,
1155            };
1156            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1157
1158            let result = future::poll_fn(|cx| {
1159                // First, poll the future we were passed as an argument and
1160                // return immediately if it's ready.
1161                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1162                    return Poll::Ready(Ok(Either::Left(value)));
1163                }
1164
1165                // Next, poll `ConcurrentState::futures` (which includes any
1166                // pending host tasks and/or background tasks), returning
1167                // immediately if one of them fails.
1168                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1169                    Poll::Ready(Some(output)) => {
1170                        match output {
1171                            Err(e) => return Poll::Ready(Err(e)),
1172                            Ok(()) => {}
1173                        }
1174                        Poll::Ready(true)
1175                    }
1176                    Poll::Ready(None) => Poll::Ready(false),
1177                    Poll::Pending => Poll::Pending,
1178                };
1179
1180                // Next, check the "high priority" work queue and return
1181                // immediately if it has at least one item.
1182                let state = reset.store.0.concurrent_state_mut();
1183                let ready = mem::take(&mut state.high_priority);
1184                let ready = if ready.is_empty() {
1185                    // Next, check the "low priority" work queue and return
1186                    // immediately if it has at least one item.
1187                    let ready = mem::take(&mut state.low_priority);
1188                    if ready.is_empty() {
1189                        return match next {
1190                            Poll::Ready(true) => {
1191                                // In this case, one of the futures in
1192                                // `ConcurrentState::futures` completed
1193                                // successfully, so we return now and continue
1194                                // the outer loop in case there is another one
1195                                // ready to complete.
1196                                Poll::Ready(Ok(Either::Right(Vec::new())))
1197                            }
1198                            Poll::Ready(false) => {
1199                                // Poll the future we were passed one last time
1200                                // in case one of `ConcurrentState::futures` had
1201                                // the side effect of unblocking it.
1202                                if let Poll::Ready(value) =
1203                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1204                                {
1205                                    Poll::Ready(Ok(Either::Left(value)))
1206                                } else {
1207                                    // In this case, there are no more pending
1208                                    // futures in `ConcurrentState::futures`,
1209                                    // there are no remaining work items, _and_
1210                                    // the future we were passed as an argument
1211                                    // still hasn't completed.
1212                                    if trap_on_idle {
1213                                        // `trap_on_idle` is true, so we exit
1214                                        // immediately.
1215                                        Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1216                                    } else {
1217                                        // `trap_on_idle` is false, so we assume
1218                                        // that future will wake up and give us
1219                                        // more work to do when it's ready to.
1220                                        Poll::Pending
1221                                    }
1222                                }
1223                            }
1224                            // There is at least one pending future in
1225                            // `ConcurrentState::futures` and we have nothing
1226                            // else to do but wait for now, so we return
1227                            // `Pending`.
1228                            Poll::Pending => Poll::Pending,
1229                        };
1230                    } else {
1231                        ready
1232                    }
1233                } else {
1234                    ready
1235                };
1236
1237                Poll::Ready(Ok(Either::Right(ready)))
1238            })
1239            .await;
1240
1241            // Put the `ConcurrentState::futures` back into the store before we
1242            // return or handle any work items since one or more of those items
1243            // might append more futures.
1244            drop(reset);
1245
1246            match result? {
1247                // The future we were passed as an argument completed, so we
1248                // return the result.
1249                Either::Left(value) => break Ok(value),
1250                // The future we were passed has not yet completed, so handle
1251                // any work items and then loop again.
1252                Either::Right(ready) => {
1253                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1254                        store: StoreContextMut<'a, T>,
1255                        ready: I,
1256                    }
1257
1258                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1259                        fn drop(&mut self) {
1260                            while let Some(item) = self.ready.next() {
1261                                match item {
1262                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1263                                    WorkItem::PushFuture(future) => {
1264                                        tls::set(self.store.0, move || drop(future))
1265                                    }
1266                                    _ => {}
1267                                }
1268                            }
1269                        }
1270                    }
1271
1272                    let mut dispose = Dispose {
1273                        store: self.as_context_mut(),
1274                        ready: ready.into_iter(),
1275                    };
1276
1277                    while let Some(item) = dispose.ready.next() {
1278                        dispose
1279                            .store
1280                            .as_context_mut()
1281                            .handle_work_item(item)
1282                            .await?;
1283                    }
1284                }
1285            }
1286        }
1287    }
1288
1289    /// Handle the specified work item, possibly resuming a fiber if applicable.
1290    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1291    where
1292        T: Send,
1293    {
1294        log::trace!("handle work item {item:?}");
1295        match item {
1296            WorkItem::PushFuture(future) => {
1297                self.0
1298                    .concurrent_state_mut()
1299                    .futures
1300                    .get_mut()
1301                    .as_mut()
1302                    .unwrap()
1303                    .push(future.into_inner());
1304            }
1305            WorkItem::ResumeFiber(fiber) => {
1306                self.0.resume_fiber(fiber).await?;
1307            }
1308            WorkItem::GuestCall(call) => {
1309                if call.is_ready(self.0)? {
1310                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1311                } else {
1312                    let state = self.0.concurrent_state_mut();
1313                    let task = state.get_mut(call.thread.task)?;
1314                    if !task.starting_sent {
1315                        task.starting_sent = true;
1316                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1317                            Waitable::Guest(call.thread.task).set_event(
1318                                state,
1319                                Some(Event::Subtask {
1320                                    status: Status::Starting,
1321                                }),
1322                            )?;
1323                        }
1324                    }
1325
1326                    let instance = state.get_mut(call.thread.task)?.instance;
1327                    self.0
1328                        .instance_state(instance)
1329                        .pending
1330                        .insert(call.thread, call.kind);
1331                }
1332            }
1333            WorkItem::WorkerFunction(fun) => {
1334                self.run_on_worker(WorkerItem::Function(fun)).await?;
1335            }
1336        }
1337
1338        Ok(())
1339    }
1340
1341    /// Execute the specified guest call on a worker fiber.
1342    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1343    where
1344        T: Send,
1345    {
1346        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1347            fiber
1348        } else {
1349            fiber::make_fiber(self.0, move |store| {
1350                loop {
1351                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1352                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1353                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1354                    }
1355
1356                    store.suspend(SuspendReason::NeedWork)?;
1357                }
1358            })?
1359        };
1360
1361        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1362        assert!(worker_item.is_none());
1363        *worker_item = Some(item);
1364
1365        self.0.resume_fiber(worker).await
1366    }
1367
1368    /// Wrap the specified host function in a future which will call it, passing
1369    /// it an `&Accessor<T>`.
1370    ///
1371    /// See the `Accessor` documentation for details.
1372    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1373    where
1374        T: 'static,
1375        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1376            + Send
1377            + Sync
1378            + 'static,
1379        R: Send + Sync + 'static,
1380    {
1381        let token = StoreToken::new(self);
1382        async move {
1383            let mut accessor = Accessor::new(token);
1384            closure(&mut accessor).await
1385        }
1386    }
1387}
1388
1389#[derive(Debug, Copy, Clone)]
1390pub struct RuntimeInstance {
1391    pub instance: ComponentInstanceId,
1392    pub index: RuntimeComponentInstanceIndex,
1393}
1394
1395impl StoreOpaque {
1396    /// Helper function to retrieve the `ConcurrentInstanceState` for the
1397    /// specified instance.
1398    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1399        StoreComponentInstanceId::new(self.id(), instance.instance)
1400            .get_mut(self)
1401            .instance_state(instance.index)
1402            .concurrent_state()
1403    }
1404
1405    /// Helper function to retrieve the `HandleTable` for the specified
1406    /// instance.
1407    fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1408        StoreComponentInstanceId::new(self.id(), instance.instance)
1409            .get_mut(self)
1410            .instance_state(instance.index)
1411            .handle_table()
1412    }
1413
1414    fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
1415        // Each time we switch threads, we conservatively set `task_may_block`
1416        // to `false` for the component instance we're switching away from (if
1417        // any), meaning it will be `false` for any new thread created for that
1418        // instance unless explicitly set otherwise.
1419        let state = self.concurrent_state_mut();
1420        let old_thread = state.guest_thread.take();
1421        if let Some(old_thread) = old_thread {
1422            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1423            StoreComponentInstanceId::new(self.id(), instance)
1424                .get_mut(self)
1425                .set_task_may_block(false)
1426        }
1427
1428        self.concurrent_state_mut().guest_thread = thread;
1429
1430        // If we're switching to a new thread, set its component instance's
1431        // `task_may_block` according to where it left off.
1432        if thread.is_some() {
1433            self.set_task_may_block();
1434        }
1435
1436        old_thread
1437    }
1438
1439    /// Set the global variable representing whether the current task may block
1440    /// prior to entering Wasm code.
1441    fn set_task_may_block(&mut self) {
1442        let state = self.concurrent_state_mut();
1443        let guest_thread = state.guest_thread.unwrap();
1444        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1445        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1446        StoreComponentInstanceId::new(self.id(), instance)
1447            .get_mut(self)
1448            .set_task_may_block(may_block)
1449    }
1450
1451    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1452        let state = self.concurrent_state_mut();
1453        let task = state.guest_thread.unwrap().task;
1454        let instance = state.get_mut(task).unwrap().instance.instance;
1455        let task_may_block = StoreComponentInstanceId::new(self.id(), instance)
1456            .get_mut(self)
1457            .get_task_may_block();
1458
1459        if task_may_block {
1460            Ok(())
1461        } else {
1462            Err(Trap::CannotBlockSyncTask.into())
1463        }
1464    }
1465
1466    /// Record that we're about to enter a (sub-)component instance which does
1467    /// not support more than one concurrent, stackful activation, meaning it
1468    /// cannot be entered again until the next call returns.
1469    fn enter_instance(&mut self, instance: RuntimeInstance) {
1470        log::trace!("enter {instance:?}");
1471        self.instance_state(instance).do_not_enter = true;
1472    }
1473
1474    /// Record that we've exited a (sub-)component instance previously entered
1475    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1476    /// See the documentation for the latter for details.
1477    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1478        log::trace!("exit {instance:?}");
1479        self.instance_state(instance).do_not_enter = false;
1480        self.partition_pending(instance)
1481    }
1482
1483    /// Iterate over `InstanceState::pending`, moving any ready items into the
1484    /// "high priority" work item queue.
1485    ///
1486    /// See `GuestCall::is_ready` for details.
1487    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1488        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1489            let call = GuestCall { thread, kind };
1490            if call.is_ready(self)? {
1491                self.concurrent_state_mut()
1492                    .push_high_priority(WorkItem::GuestCall(call));
1493            } else {
1494                self.instance_state(instance)
1495                    .pending
1496                    .insert(call.thread, call.kind);
1497            }
1498        }
1499
1500        Ok(())
1501    }
1502
1503    /// Implements the `backpressure.{inc,dec}` intrinsics.
1504    pub(crate) fn backpressure_modify(
1505        &mut self,
1506        caller_instance: RuntimeInstance,
1507        modify: impl FnOnce(u16) -> Option<u16>,
1508    ) -> Result<()> {
1509        let state = self.instance_state(caller_instance);
1510        let old = state.backpressure;
1511        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1512        state.backpressure = new;
1513
1514        if old > 0 && new == 0 {
1515            // Backpressure was previously enabled and is now disabled; move any
1516            // newly-eligible guest calls to the "high priority" queue.
1517            self.partition_pending(caller_instance)?;
1518        }
1519
1520        Ok(())
1521    }
1522
1523    /// Resume the specified fiber, giving it exclusive access to the specified
1524    /// store.
1525    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1526        let old_thread = self.concurrent_state_mut().guest_thread;
1527        log::trace!("resume_fiber: save current thread {old_thread:?}");
1528
1529        let fiber = fiber::resolve_or_release(self, fiber).await?;
1530
1531        self.set_thread(old_thread);
1532
1533        let state = self.concurrent_state_mut();
1534
1535        if let Some(ref ot) = old_thread {
1536            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1537        }
1538        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1539
1540        if let Some(mut fiber) = fiber {
1541            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1542            // See the `SuspendReason` documentation for what each case means.
1543            match state.suspend_reason.take().unwrap() {
1544                SuspendReason::NeedWork => {
1545                    if state.worker.is_none() {
1546                        state.worker = Some(fiber);
1547                    } else {
1548                        fiber.dispose(self);
1549                    }
1550                }
1551                SuspendReason::Yielding { thread, .. } => {
1552                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1553                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1554                }
1555                SuspendReason::ExplicitlySuspending { thread, .. } => {
1556                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1557                }
1558                SuspendReason::Waiting { set, thread, .. } => {
1559                    let old = state
1560                        .get_mut(set)?
1561                        .waiting
1562                        .insert(thread, WaitMode::Fiber(fiber));
1563                    assert!(old.is_none());
1564                }
1565            };
1566        } else {
1567            log::trace!("resume_fiber: fiber has exited");
1568        }
1569
1570        Ok(())
1571    }
1572
1573    /// Suspend the current fiber, storing the reason in
1574    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1575    /// it should be resumed.
1576    ///
1577    /// See the `SuspendReason` documentation for details.
1578    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1579        log::trace!("suspend fiber: {reason:?}");
1580
1581        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1582        // pop the call context which manages resource borrows before suspending
1583        // and then push it again once we've resumed.
1584        let task = match &reason {
1585            SuspendReason::Yielding { thread, .. }
1586            | SuspendReason::Waiting { thread, .. }
1587            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1588            SuspendReason::NeedWork => None,
1589        };
1590
1591        let old_guest_thread = if let Some(task) = task {
1592            self.maybe_pop_call_context(task)?;
1593            self.concurrent_state_mut().guest_thread
1594        } else {
1595            None
1596        };
1597
1598        // We should not have reached here unless either there's no current
1599        // task, or the current task is permitted to block.  In addition, we
1600        // special-case `thread.switch-to` and waiting for a subtask to go from
1601        // `starting` to `started`, both of which we consider non-blocking
1602        // operations despite requiring a suspend.
1603        assert!(
1604            matches!(
1605                reason,
1606                SuspendReason::ExplicitlySuspending {
1607                    skip_may_block_check: true,
1608                    ..
1609                } | SuspendReason::Waiting {
1610                    skip_may_block_check: true,
1611                    ..
1612                } | SuspendReason::Yielding {
1613                    skip_may_block_check: true,
1614                    ..
1615                }
1616            ) || old_guest_thread
1617                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1618                .unwrap_or(true)
1619        );
1620
1621        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1622        assert!(suspend_reason.is_none());
1623        *suspend_reason = Some(reason);
1624
1625        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1626
1627        if let Some(task) = task {
1628            self.set_thread(old_guest_thread);
1629            self.maybe_push_call_context(task)?;
1630        }
1631
1632        Ok(())
1633    }
1634
1635    /// Push the call context for managing resource borrows for the specified
1636    /// guest task if it has not yet either returned a result or cancelled
1637    /// itself.
1638    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1639        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1640
1641        if !task.returned_or_cancelled() {
1642            log::trace!("push call context for {guest_task:?}");
1643            let call_context = task.call_context.take().unwrap();
1644            self.component_resource_state().0.push(call_context);
1645        }
1646        Ok(())
1647    }
1648
1649    /// Pop the call context for managing resource borrows for the specified
1650    /// guest task if it has not yet either returned a result or cancelled
1651    /// itself.
1652    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1653        if !self
1654            .concurrent_state_mut()
1655            .get_mut(guest_task)?
1656            .returned_or_cancelled()
1657        {
1658            log::trace!("pop call context for {guest_task:?}");
1659            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1660            self.concurrent_state_mut()
1661                .get_mut(guest_task)?
1662                .call_context = call_context;
1663        }
1664        Ok(())
1665    }
1666
1667    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1668        let state = self.concurrent_state_mut();
1669        let caller = state.guest_thread.unwrap();
1670        let old_set = waitable.common(state)?.set;
1671        let set = state.get_mut(caller.task)?.sync_call_set;
1672        waitable.join(state, Some(set))?;
1673        self.suspend(SuspendReason::Waiting {
1674            set,
1675            thread: caller,
1676            skip_may_block_check: false,
1677        })?;
1678        let state = self.concurrent_state_mut();
1679        waitable.join(state, old_set)
1680    }
1681}
1682
1683impl Instance {
1684    /// Get the next pending event for the specified task and (optional)
1685    /// waitable set, along with the waitable handle if applicable.
1686    fn get_event(
1687        self,
1688        store: &mut StoreOpaque,
1689        guest_task: TableId<GuestTask>,
1690        set: Option<TableId<WaitableSet>>,
1691        cancellable: bool,
1692    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1693        let state = store.concurrent_state_mut();
1694
1695        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1696            log::trace!("deliver event {event:?} to {guest_task:?}");
1697
1698            if cancellable || !matches!(event, Event::Cancelled) {
1699                return Ok(Some((event, None)));
1700            } else {
1701                state.get_mut(guest_task)?.event = Some(event);
1702            }
1703        }
1704
1705        Ok(
1706            if let Some((set, waitable)) = set
1707                .and_then(|set| {
1708                    state
1709                        .get_mut(set)
1710                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1711                        .transpose()
1712                })
1713                .transpose()?
1714            {
1715                let common = waitable.common(state)?;
1716                let handle = common.handle.unwrap();
1717                let event = common.event.take().unwrap();
1718
1719                log::trace!(
1720                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1721                );
1722
1723                waitable.on_delivery(store, self, event);
1724
1725                Some((event, Some((waitable, handle))))
1726            } else {
1727                None
1728            },
1729        )
1730    }
1731
1732    /// Handle the `CallbackCode` returned from an async-lifted export or its
1733    /// callback.
1734    ///
1735    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1736    /// using `handle_guest_call`.
1737    fn handle_callback_code(
1738        self,
1739        store: &mut StoreOpaque,
1740        guest_thread: QualifiedThreadId,
1741        runtime_instance: RuntimeComponentInstanceIndex,
1742        code: u32,
1743    ) -> Result<Option<GuestCall>> {
1744        let (code, set) = unpack_callback_code(code);
1745
1746        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1747
1748        let state = store.concurrent_state_mut();
1749
1750        let get_set = |store: &mut StoreOpaque, handle| {
1751            if handle == 0 {
1752                bail!("invalid waitable-set handle");
1753            }
1754
1755            let set = store
1756                .handle_table(RuntimeInstance {
1757                    instance: self.id().instance(),
1758                    index: runtime_instance,
1759                })
1760                .waitable_set_rep(handle)?;
1761
1762            Ok(TableId::<WaitableSet>::new(set))
1763        };
1764
1765        Ok(match code {
1766            callback_code::EXIT => {
1767                log::trace!("implicit thread {guest_thread:?} completed");
1768                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1769                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1770                if task.threads.is_empty() && !task.returned_or_cancelled() {
1771                    bail!(Trap::NoAsyncResult);
1772                }
1773                match &task.caller {
1774                    Caller::Host { .. } => {
1775                        if task.ready_to_delete() {
1776                            Waitable::Guest(guest_thread.task)
1777                                .delete_from(store.concurrent_state_mut())?;
1778                        }
1779                    }
1780                    Caller::Guest { .. } => {
1781                        task.exited = true;
1782                        task.callback = None;
1783                    }
1784                }
1785                None
1786            }
1787            callback_code::YIELD => {
1788                let task = state.get_mut(guest_thread.task)?;
1789                // If an `Event::Cancelled` is pending, we'll deliver that;
1790                // otherwise, we'll deliver `Event::None`.  Note that
1791                // `GuestTask::event` is only ever set to one of those two
1792                // `Event` variants.
1793                if let Some(event) = task.event {
1794                    assert!(matches!(event, Event::None | Event::Cancelled));
1795                } else {
1796                    task.event = Some(Event::None);
1797                }
1798                let call = GuestCall {
1799                    thread: guest_thread,
1800                    kind: GuestCallKind::DeliverEvent {
1801                        instance: self,
1802                        set: None,
1803                    },
1804                };
1805                if state.may_block(guest_thread.task) {
1806                    // Push this thread onto the "low priority" queue so it runs
1807                    // after any other threads have had a chance to run.
1808                    state.push_low_priority(WorkItem::GuestCall(call));
1809                    None
1810                } else {
1811                    // Yielding in a non-blocking context is defined as a no-op
1812                    // according to the spec, so we must run this thread
1813                    // immediately without allowing any others to run.
1814                    Some(call)
1815                }
1816            }
1817            callback_code::WAIT => {
1818                // The task may only return `WAIT` if it was created for a call
1819                // to an async export).  Otherwise, we'll trap.
1820                state.check_blocking_for(guest_thread.task)?;
1821
1822                let set = get_set(store, set)?;
1823                let state = store.concurrent_state_mut();
1824
1825                if state.get_mut(guest_thread.task)?.event.is_some()
1826                    || !state.get_mut(set)?.ready.is_empty()
1827                {
1828                    // An event is immediately available; deliver it ASAP.
1829                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1830                        thread: guest_thread,
1831                        kind: GuestCallKind::DeliverEvent {
1832                            instance: self,
1833                            set: Some(set),
1834                        },
1835                    }));
1836                } else {
1837                    // No event is immediately available.
1838                    //
1839                    // We're waiting, so register to be woken up when an event
1840                    // is published for this waitable set.
1841                    //
1842                    // Here we also set `GuestTask::wake_on_cancel` which allows
1843                    // `subtask.cancel` to interrupt the wait.
1844                    let old = state
1845                        .get_mut(guest_thread.thread)?
1846                        .wake_on_cancel
1847                        .replace(set);
1848                    assert!(old.is_none());
1849                    let old = state
1850                        .get_mut(set)?
1851                        .waiting
1852                        .insert(guest_thread, WaitMode::Callback(self));
1853                    assert!(old.is_none());
1854                }
1855                None
1856            }
1857            _ => bail!("unsupported callback code: {code}"),
1858        })
1859    }
1860
1861    fn cleanup_thread(
1862        self,
1863        store: &mut StoreOpaque,
1864        guest_thread: QualifiedThreadId,
1865        runtime_instance: RuntimeComponentInstanceIndex,
1866    ) -> Result<()> {
1867        let guest_id = store
1868            .concurrent_state_mut()
1869            .get_mut(guest_thread.thread)?
1870            .instance_rep;
1871        store
1872            .handle_table(RuntimeInstance {
1873                instance: self.id().instance(),
1874                index: runtime_instance,
1875            })
1876            .guest_thread_remove(guest_id.unwrap())?;
1877
1878        store.concurrent_state_mut().delete(guest_thread.thread)?;
1879        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1880        task.threads.remove(&guest_thread.thread);
1881        Ok(())
1882    }
1883
1884    /// Add the specified guest call to the "high priority" work item queue, to
1885    /// be started as soon as backpressure and/or reentrance rules allow.
1886    ///
1887    /// SAFETY: The raw pointer arguments must be valid references to guest
1888    /// functions (with the appropriate signatures) when the closures queued by
1889    /// this function are called.
1890    unsafe fn queue_call<T: 'static>(
1891        self,
1892        mut store: StoreContextMut<T>,
1893        guest_thread: QualifiedThreadId,
1894        callee: SendSyncPtr<VMFuncRef>,
1895        param_count: usize,
1896        result_count: usize,
1897        flags: Option<InstanceFlags>,
1898        async_: bool,
1899        callback: Option<SendSyncPtr<VMFuncRef>>,
1900        post_return: Option<SendSyncPtr<VMFuncRef>>,
1901    ) -> Result<()> {
1902        /// Return a closure which will call the specified function in the scope
1903        /// of the specified task.
1904        ///
1905        /// This will use `GuestTask::lower_params` to lower the parameters, but
1906        /// will not lift the result; instead, it returns a
1907        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1908        /// any, may be lifted.  Note that an async-lifted export will have
1909        /// returned its result using the `task.return` intrinsic (or not
1910        /// returned a result at all, in the case of `task.cancel`), in which
1911        /// case the "result" of this call will either be a callback code or
1912        /// nothing.
1913        ///
1914        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1915        /// the returned closure is called.
1916        unsafe fn make_call<T: 'static>(
1917            store: StoreContextMut<T>,
1918            guest_thread: QualifiedThreadId,
1919            callee: SendSyncPtr<VMFuncRef>,
1920            param_count: usize,
1921            result_count: usize,
1922            flags: Option<InstanceFlags>,
1923        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1924        + Send
1925        + Sync
1926        + 'static
1927        + use<T> {
1928            let token = StoreToken::new(store);
1929            move |store: &mut dyn VMStore| {
1930                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1931
1932                store
1933                    .concurrent_state_mut()
1934                    .get_mut(guest_thread.thread)?
1935                    .state = GuestThreadState::Running;
1936                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1937                let may_enter_after_call = task.call_post_return_automatically();
1938                let lower = task.lower_params.take().unwrap();
1939
1940                lower(store, &mut storage[..param_count])?;
1941
1942                let mut store = token.as_context_mut(store);
1943
1944                // SAFETY: Per the contract documented in `make_call's`
1945                // documentation, `callee` must be a valid pointer.
1946                unsafe {
1947                    if let Some(mut flags) = flags {
1948                        flags.set_may_enter(false);
1949                    }
1950                    crate::Func::call_unchecked_raw(
1951                        &mut store,
1952                        callee.as_non_null(),
1953                        NonNull::new(
1954                            &mut storage[..param_count.max(result_count)]
1955                                as *mut [MaybeUninit<ValRaw>] as _,
1956                        )
1957                        .unwrap(),
1958                    )?;
1959                    if let Some(mut flags) = flags {
1960                        flags.set_may_enter(may_enter_after_call);
1961                    }
1962                }
1963
1964                Ok(storage)
1965            }
1966        }
1967
1968        // SAFETY: Per the contract described in this function documentation,
1969        // the `callee` pointer which `call` closes over must be valid when
1970        // called by the closure we queue below.
1971        let call = unsafe {
1972            make_call(
1973                store.as_context_mut(),
1974                guest_thread,
1975                callee,
1976                param_count,
1977                result_count,
1978                flags,
1979            )
1980        };
1981
1982        let callee_instance = store
1983            .0
1984            .concurrent_state_mut()
1985            .get_mut(guest_thread.task)?
1986            .instance;
1987
1988        let fun = if callback.is_some() {
1989            assert!(async_);
1990
1991            Box::new(move |store: &mut dyn VMStore| {
1992                self.add_guest_thread_to_instance_table(
1993                    guest_thread.thread,
1994                    store,
1995                    callee_instance.index,
1996                )?;
1997                let old_thread = store.set_thread(Some(guest_thread));
1998                log::trace!(
1999                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2000                );
2001
2002                store.maybe_push_call_context(guest_thread.task)?;
2003
2004                store.enter_instance(callee_instance);
2005
2006                // SAFETY: See the documentation for `make_call` to review the
2007                // contract we must uphold for `call` here.
2008                //
2009                // Per the contract described in the `queue_call`
2010                // documentation, the `callee` pointer which `call` closes
2011                // over must be valid.
2012                let storage = call(store)?;
2013
2014                store.exit_instance(callee_instance)?;
2015
2016                store.maybe_pop_call_context(guest_thread.task)?;
2017
2018                store.set_thread(old_thread);
2019                let state = store.concurrent_state_mut();
2020                old_thread
2021                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2022                log::trace!("stackless call: restored {old_thread:?} as current thread");
2023
2024                // SAFETY: `wasmparser` will have validated that the callback
2025                // function returns a `i32` result.
2026                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2027
2028                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2029            })
2030                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2031        } else {
2032            let token = StoreToken::new(store.as_context_mut());
2033            Box::new(move |store: &mut dyn VMStore| {
2034                self.add_guest_thread_to_instance_table(
2035                    guest_thread.thread,
2036                    store,
2037                    callee_instance.index,
2038                )?;
2039                let old_thread = store.set_thread(Some(guest_thread));
2040                log::trace!(
2041                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2042                );
2043                let mut flags = self.id().get(store).instance_flags(callee_instance.index);
2044
2045                store.maybe_push_call_context(guest_thread.task)?;
2046
2047                // Unless this is a callback-less (i.e. stackful)
2048                // async-lifted export, we need to record that the instance
2049                // cannot be entered until the call returns.
2050                if !async_ {
2051                    store.enter_instance(callee_instance);
2052                }
2053
2054                // SAFETY: See the documentation for `make_call` to review the
2055                // contract we must uphold for `call` here.
2056                //
2057                // Per the contract described in the `queue_call`
2058                // documentation, the `callee` pointer which `call` closes
2059                // over must be valid.
2060                let storage = call(store)?;
2061
2062                // This is a callback-less call, so the implicit thread has now completed
2063                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2064
2065                if async_ {
2066                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2067                    if task.threads.is_empty() && !task.returned_or_cancelled() {
2068                        bail!(Trap::NoAsyncResult);
2069                    }
2070                } else {
2071                    // This is a sync-lifted export, so now is when we lift the
2072                    // result, optionally call the post-return function, if any,
2073                    // and finally notify any current or future waiters that the
2074                    // subtask has returned.
2075
2076                    let lift = {
2077                        store.exit_instance(callee_instance)?;
2078
2079                        let state = store.concurrent_state_mut();
2080                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2081
2082                        state
2083                            .get_mut(guest_thread.task)?
2084                            .lift_result
2085                            .take()
2086                            .unwrap()
2087                    };
2088
2089                    // SAFETY: `result_count` represents the number of core Wasm
2090                    // results returned, per `wasmparser`.
2091                    let result = (lift.lift)(store, unsafe {
2092                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2093                            &storage[..result_count],
2094                        )
2095                    })?;
2096
2097                    let post_return_arg = match result_count {
2098                        0 => ValRaw::i32(0),
2099                        // SAFETY: `result_count` represents the number of
2100                        // core Wasm results returned, per `wasmparser`.
2101                        1 => unsafe { storage[0].assume_init() },
2102                        _ => unreachable!(),
2103                    };
2104
2105                    if store
2106                        .concurrent_state_mut()
2107                        .get_mut(guest_thread.task)?
2108                        .call_post_return_automatically()
2109                    {
2110                        unsafe {
2111                            flags.set_may_leave(false);
2112                            flags.set_needs_post_return(false);
2113                        }
2114
2115                        if let Some(func) = post_return {
2116                            let mut store = token.as_context_mut(store);
2117
2118                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
2119                            // either `wasmtime-cranelift`-generated fused adapter
2120                            // code or `component::Options`.  Per `wasmparser`
2121                            // post-return signature validation, we know it takes a
2122                            // single parameter.
2123                            unsafe {
2124                                crate::Func::call_unchecked_raw(
2125                                    &mut store,
2126                                    func.as_non_null(),
2127                                    slice::from_ref(&post_return_arg).into(),
2128                                )?;
2129                            }
2130                        }
2131
2132                        unsafe {
2133                            flags.set_may_leave(true);
2134                            flags.set_may_enter(true);
2135                        }
2136                    }
2137
2138                    self.task_complete(
2139                        store,
2140                        guest_thread.task,
2141                        result,
2142                        Status::Returned,
2143                        post_return_arg,
2144                    )?;
2145                }
2146
2147                store.set_thread(old_thread);
2148
2149                store.maybe_pop_call_context(guest_thread.task)?;
2150
2151                let state = store.concurrent_state_mut();
2152                let task = state.get_mut(guest_thread.task)?;
2153
2154                match &task.caller {
2155                    Caller::Host { .. } => {
2156                        if task.ready_to_delete() {
2157                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2158                        }
2159                    }
2160                    Caller::Guest { .. } => {
2161                        task.exited = true;
2162                    }
2163                }
2164
2165                Ok(None)
2166            })
2167        };
2168
2169        store
2170            .0
2171            .concurrent_state_mut()
2172            .push_high_priority(WorkItem::GuestCall(GuestCall {
2173                thread: guest_thread,
2174                kind: GuestCallKind::StartImplicit(fun),
2175            }));
2176
2177        Ok(())
2178    }
2179
2180    /// Prepare (but do not start) a guest->guest call.
2181    ///
2182    /// This is called from fused adapter code generated in
2183    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2184    /// are synthesized Wasm functions which move the parameters from the caller
2185    /// to the callee and the result from the callee to the caller,
2186    /// respectively.  The adapter will call `Self::start_call` immediately
2187    /// after calling this function.
2188    ///
2189    /// SAFETY: All the pointer arguments must be valid pointers to guest
2190    /// entities (and with the expected signatures for the function references
2191    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2192    unsafe fn prepare_call<T: 'static>(
2193        self,
2194        mut store: StoreContextMut<T>,
2195        start: *mut VMFuncRef,
2196        return_: *mut VMFuncRef,
2197        caller_instance: RuntimeComponentInstanceIndex,
2198        callee_instance: RuntimeComponentInstanceIndex,
2199        task_return_type: TypeTupleIndex,
2200        callee_async: bool,
2201        memory: *mut VMMemoryDefinition,
2202        string_encoding: u8,
2203        caller_info: CallerInfo,
2204    ) -> Result<()> {
2205        self.id().get(store.0).check_may_leave(caller_instance)?;
2206
2207        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2208            // A task may only call an async-typed function via a sync lower if
2209            // it was created by a call to an async export.  Otherwise, we'll
2210            // trap.
2211            store.0.check_blocking()?;
2212        }
2213
2214        enum ResultInfo {
2215            Heap { results: u32 },
2216            Stack { result_count: u32 },
2217        }
2218
2219        let result_info = match &caller_info {
2220            CallerInfo::Async {
2221                has_result: true,
2222                params,
2223            } => ResultInfo::Heap {
2224                results: params.last().unwrap().get_u32(),
2225            },
2226            CallerInfo::Async {
2227                has_result: false, ..
2228            } => ResultInfo::Stack { result_count: 0 },
2229            CallerInfo::Sync {
2230                result_count,
2231                params,
2232            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2233                results: params.last().unwrap().get_u32(),
2234            },
2235            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2236                result_count: *result_count,
2237            },
2238        };
2239
2240        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2241
2242        // Create a new guest task for the call, closing over the `start` and
2243        // `return_` functions to lift the parameters and lower the result,
2244        // respectively.
2245        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2246        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2247        let token = StoreToken::new(store.as_context_mut());
2248        let state = store.0.concurrent_state_mut();
2249        let old_thread = state.guest_thread.take();
2250        let new_task = GuestTask::new(
2251            state,
2252            Box::new(move |store, dst| {
2253                let mut store = token.as_context_mut(store);
2254                assert!(dst.len() <= MAX_FLAT_PARAMS);
2255                // The `+ 1` here accounts for the return pointer, if any:
2256                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2257                let count = match caller_info {
2258                    // Async callers, if they have a result, use the last
2259                    // parameter as a return pointer so chop that off if
2260                    // relevant here.
2261                    CallerInfo::Async { params, has_result } => {
2262                        let params = &params[..params.len() - usize::from(has_result)];
2263                        for (param, src) in params.iter().zip(&mut src) {
2264                            src.write(*param);
2265                        }
2266                        params.len()
2267                    }
2268
2269                    // Sync callers forward everything directly.
2270                    CallerInfo::Sync { params, .. } => {
2271                        for (param, src) in params.iter().zip(&mut src) {
2272                            src.write(*param);
2273                        }
2274                        params.len()
2275                    }
2276                };
2277                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2278                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2279                // how it was constructed (see
2280                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2281                // for details) we know it takes count parameters and returns
2282                // `dst.len()` results.
2283                unsafe {
2284                    crate::Func::call_unchecked_raw(
2285                        &mut store,
2286                        start.as_non_null(),
2287                        NonNull::new(
2288                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2289                        )
2290                        .unwrap(),
2291                    )?;
2292                }
2293                dst.copy_from_slice(&src[..dst.len()]);
2294                let state = store.0.concurrent_state_mut();
2295                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2296                    state,
2297                    Some(Event::Subtask {
2298                        status: Status::Started,
2299                    }),
2300                )?;
2301                Ok(())
2302            }),
2303            LiftResult {
2304                lift: Box::new(move |store, src| {
2305                    // SAFETY: See comment in closure passed as `lower_params`
2306                    // parameter above.
2307                    let mut store = token.as_context_mut(store);
2308                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2309                    if let ResultInfo::Heap { results } = &result_info {
2310                        my_src.push(ValRaw::u32(*results));
2311                    }
2312                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2313                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2314                    // on how it was constructed (see
2315                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2316                    // for details) we know it takes `src.len()` parameters and
2317                    // returns up to 1 result.
2318                    unsafe {
2319                        crate::Func::call_unchecked_raw(
2320                            &mut store,
2321                            return_.as_non_null(),
2322                            my_src.as_mut_slice().into(),
2323                        )?;
2324                    }
2325                    let state = store.0.concurrent_state_mut();
2326                    let thread = state.guest_thread.unwrap();
2327                    if sync_caller {
2328                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2329                            if let ResultInfo::Stack { result_count } = &result_info {
2330                                match result_count {
2331                                    0 => None,
2332                                    1 => Some(my_src[0]),
2333                                    _ => unreachable!(),
2334                                }
2335                            } else {
2336                                None
2337                            },
2338                        );
2339                    }
2340                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2341                }),
2342                ty: task_return_type,
2343                memory: NonNull::new(memory).map(SendSyncPtr::new),
2344                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2345            },
2346            Caller::Guest {
2347                thread: old_thread.unwrap(),
2348                instance: caller_instance,
2349            },
2350            None,
2351            self,
2352            callee_instance,
2353            callee_async,
2354        )?;
2355
2356        let guest_task = state.push(new_task)?;
2357        let new_thread = GuestThread::new_implicit(guest_task);
2358        let guest_thread = state.push(new_thread)?;
2359        state.get_mut(guest_task)?.threads.insert(guest_thread);
2360
2361        let state = store.0.concurrent_state_mut();
2362        if let Some(old_thread) = old_thread {
2363            if !state.may_enter(guest_task) {
2364                bail!(crate::Trap::CannotEnterComponent);
2365            }
2366
2367            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2368        };
2369
2370        // Make the new thread the current one so that `Self::start_call` knows
2371        // which one to start.
2372        store.0.set_thread(Some(QualifiedThreadId {
2373            task: guest_task,
2374            thread: guest_thread,
2375        }));
2376        log::trace!(
2377            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2378        );
2379
2380        Ok(())
2381    }
2382
2383    /// Call the specified callback function for an async-lifted export.
2384    ///
2385    /// SAFETY: `function` must be a valid reference to a guest function of the
2386    /// correct signature for a callback.
2387    unsafe fn call_callback<T>(
2388        self,
2389        mut store: StoreContextMut<T>,
2390        callee_instance: RuntimeComponentInstanceIndex,
2391        function: SendSyncPtr<VMFuncRef>,
2392        event: Event,
2393        handle: u32,
2394        may_enter_after_call: bool,
2395    ) -> Result<u32> {
2396        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2397
2398        let (ordinal, result) = event.parts();
2399        let params = &mut [
2400            ValRaw::u32(ordinal),
2401            ValRaw::u32(handle),
2402            ValRaw::u32(result),
2403        ];
2404        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2405        // `wasmtime-cranelift`-generated fused adapter code or
2406        // `component::Options`.  Per `wasmparser` callback signature
2407        // validation, we know it takes three parameters and returns one.
2408        unsafe {
2409            flags.set_may_enter(false);
2410            crate::Func::call_unchecked_raw(
2411                &mut store,
2412                function.as_non_null(),
2413                params.as_mut_slice().into(),
2414            )?;
2415            flags.set_may_enter(may_enter_after_call);
2416        }
2417        Ok(params[0].get_u32())
2418    }
2419
2420    /// Start a guest->guest call previously prepared using
2421    /// `Self::prepare_call`.
2422    ///
2423    /// This is called from fused adapter code generated in
2424    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2425    /// this function immediately after calling `Self::prepare_call`.
2426    ///
2427    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2428    /// functions with the appropriate signatures for the current guest task.
2429    /// If this is a call to an async-lowered import, the actual call may be
2430    /// deferred and run after this function returns, in which case the pointer
2431    /// arguments must also be valid when the call happens.
2432    unsafe fn start_call<T: 'static>(
2433        self,
2434        mut store: StoreContextMut<T>,
2435        callback: *mut VMFuncRef,
2436        post_return: *mut VMFuncRef,
2437        callee: *mut VMFuncRef,
2438        param_count: u32,
2439        result_count: u32,
2440        flags: u32,
2441        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2442    ) -> Result<u32> {
2443        let token = StoreToken::new(store.as_context_mut());
2444        let async_caller = storage.is_none();
2445        let state = store.0.concurrent_state_mut();
2446        let guest_thread = state.guest_thread.unwrap();
2447        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2448        let may_enter_after_call = state
2449            .get_mut(guest_thread.task)?
2450            .call_post_return_automatically();
2451        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2452        let param_count = usize::try_from(param_count).unwrap();
2453        assert!(param_count <= MAX_FLAT_PARAMS);
2454        let result_count = usize::try_from(result_count).unwrap();
2455        assert!(result_count <= MAX_FLAT_RESULTS);
2456
2457        let task = state.get_mut(guest_thread.task)?;
2458        if !callback.is_null() {
2459            // We're calling an async-lifted export with a callback, so store
2460            // the callback and related context as part of the task so we can
2461            // call it later when needed.
2462            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2463            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2464                let store = token.as_context_mut(store);
2465                unsafe {
2466                    self.call_callback::<T>(
2467                        store,
2468                        runtime_instance,
2469                        callback,
2470                        event,
2471                        handle,
2472                        may_enter_after_call,
2473                    )
2474                }
2475            }));
2476        }
2477
2478        let Caller::Guest {
2479            thread: caller,
2480            instance: runtime_instance,
2481        } = &task.caller
2482        else {
2483            // As of this writing, `start_call` is only used for guest->guest
2484            // calls.
2485            unreachable!()
2486        };
2487        let caller = *caller;
2488        let caller_instance = *runtime_instance;
2489
2490        let callee_instance = task.instance;
2491
2492        let instance_flags = if callback.is_null() {
2493            None
2494        } else {
2495            Some(self.id().get(store.0).instance_flags(callee_instance.index))
2496        };
2497
2498        // Queue the call as a "high priority" work item.
2499        unsafe {
2500            self.queue_call(
2501                store.as_context_mut(),
2502                guest_thread,
2503                callee,
2504                param_count,
2505                result_count,
2506                instance_flags,
2507                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2508                NonNull::new(callback).map(SendSyncPtr::new),
2509                NonNull::new(post_return).map(SendSyncPtr::new),
2510            )?;
2511        }
2512
2513        let state = store.0.concurrent_state_mut();
2514
2515        // Use the caller's `GuestTask::sync_call_set` to register interest in
2516        // the subtask...
2517        let guest_waitable = Waitable::Guest(guest_thread.task);
2518        let old_set = guest_waitable.common(state)?.set;
2519        let set = state.get_mut(caller.task)?.sync_call_set;
2520        guest_waitable.join(state, Some(set))?;
2521
2522        // ... and suspend this fiber temporarily while we wait for it to start.
2523        //
2524        // Note that we _could_ call the callee directly using the current fiber
2525        // rather than suspend this one, but that would make reasoning about the
2526        // event loop more complicated and is probably only worth doing if
2527        // there's a measurable performance benefit.  In addition, it would mean
2528        // blocking the caller if the callee calls a blocking sync-lowered
2529        // import, and as of this writing the spec says we must not do that.
2530        //
2531        // Alternatively, the fused adapter code could be modified to call the
2532        // callee directly without calling a host-provided intrinsic at all (in
2533        // which case it would need to do its own, inline backpressure checks,
2534        // etc.).  Again, we'd want to see a measurable performance benefit
2535        // before committing to such an optimization.  And again, we'd need to
2536        // update the spec to allow that.
2537        let (status, waitable) = loop {
2538            store.0.suspend(SuspendReason::Waiting {
2539                set,
2540                thread: caller,
2541                // Normally, `StoreOpaque::suspend` would assert it's being
2542                // called from a context where blocking is allowed.  However, if
2543                // `async_caller` is `true`, we'll only "block" long enough for
2544                // the callee to start, i.e. we won't repeat this loop, so we
2545                // tell `suspend` it's okay even if we're not allowed to block.
2546                // Alternatively, if the callee is not an async function, then
2547                // we know it won't block anyway.
2548                skip_may_block_check: async_caller || !callee_async,
2549            })?;
2550
2551            let state = store.0.concurrent_state_mut();
2552
2553            log::trace!("taking event for {:?}", guest_thread.task);
2554            let event = guest_waitable.take_event(state)?;
2555            let Some(Event::Subtask { status }) = event else {
2556                unreachable!();
2557            };
2558
2559            log::trace!("status {status:?} for {:?}", guest_thread.task);
2560
2561            if status == Status::Returned {
2562                // It returned, so we can stop waiting.
2563                break (status, None);
2564            } else if async_caller {
2565                // It hasn't returned yet, but the caller is calling via an
2566                // async-lowered import, so we generate a handle for the task
2567                // waitable and return the status.
2568                let handle = store
2569                    .0
2570                    .handle_table(RuntimeInstance {
2571                        instance: self.id().instance(),
2572                        index: caller_instance,
2573                    })
2574                    .subtask_insert_guest(guest_thread.task.rep())?;
2575                store
2576                    .0
2577                    .concurrent_state_mut()
2578                    .get_mut(guest_thread.task)?
2579                    .common
2580                    .handle = Some(handle);
2581                break (status, Some(handle));
2582            } else {
2583                // The callee hasn't returned yet, and the caller is calling via
2584                // a sync-lowered import, so we loop and keep waiting until the
2585                // callee returns.
2586            }
2587        };
2588
2589        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2590
2591        // Reset the current thread to point to the caller as it resumes control.
2592        store.0.set_thread(Some(caller));
2593        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2594        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2595
2596        if let Some(storage) = storage {
2597            // The caller used a sync-lowered import to call an async-lifted
2598            // export, in which case the result, if any, has been stashed in
2599            // `GuestTask::sync_result`.
2600            let state = store.0.concurrent_state_mut();
2601            let task = state.get_mut(guest_thread.task)?;
2602            if let Some(result) = task.sync_result.take() {
2603                if let Some(result) = result {
2604                    storage[0] = MaybeUninit::new(result);
2605                }
2606
2607                if task.exited && task.ready_to_delete() {
2608                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2609                }
2610            }
2611        }
2612
2613        Ok(status.pack(waitable))
2614    }
2615
2616    /// Poll the specified future once on behalf of a guest->host call using an
2617    /// async-lowered import.
2618    ///
2619    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2620    /// `Pending`, add it to the set of futures to be polled as part of this
2621    /// instance's event loop until it completes, and then return
2622    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2623    ///
2624    /// Whether the future returns `Ready` immediately or later, the `lower`
2625    /// function will be used to lower the result, if any, into the guest caller's
2626    /// stack and linear memory unless the task has been cancelled.
2627    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2628        self,
2629        mut store: StoreContextMut<'_, T>,
2630        future: impl Future<Output = Result<R>> + Send + 'static,
2631        caller_instance: RuntimeComponentInstanceIndex,
2632        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2633    ) -> Result<Option<u32>> {
2634        let token = StoreToken::new(store.as_context_mut());
2635        let state = store.0.concurrent_state_mut();
2636        let caller = state.guest_thread.unwrap();
2637
2638        // Create an abortable future which hooks calls to poll and manages call
2639        // context state for the future.
2640        let (join_handle, future) = JoinHandle::run(async move {
2641            let mut future = pin!(future);
2642            let mut call_context = None;
2643            future::poll_fn(move |cx| {
2644                // Push the call context for managing any resource borrows
2645                // for the task.
2646                tls::get(|store| {
2647                    if let Some(call_context) = call_context.take() {
2648                        token
2649                            .as_context_mut(store)
2650                            .0
2651                            .component_resource_state()
2652                            .0
2653                            .push(call_context);
2654                    }
2655                });
2656
2657                let result = future.as_mut().poll(cx);
2658
2659                if result.is_pending() {
2660                    // Pop the call context for managing any resource
2661                    // borrows for the task.
2662                    tls::get(|store| {
2663                        call_context = Some(
2664                            token
2665                                .as_context_mut(store)
2666                                .0
2667                                .component_resource_state()
2668                                .0
2669                                .pop()
2670                                .unwrap(),
2671                        );
2672                    });
2673                }
2674                result
2675            })
2676            .await
2677        });
2678
2679        // We create a new host task even though it might complete immediately
2680        // (in which case we won't need to pass a waitable back to the guest).
2681        // If it does complete immediately, we'll remove it before we return.
2682        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2683
2684        log::trace!("new host task child of {caller:?}: {task:?}");
2685
2686        let mut future = Box::pin(future);
2687
2688        // Finally, poll the future.  We can use a dummy `Waker` here because
2689        // we'll add the future to `ConcurrentState::futures` and poll it
2690        // automatically from the event loop if it doesn't complete immediately
2691        // here.
2692        let poll = tls::set(store.0, || {
2693            future
2694                .as_mut()
2695                .poll(&mut Context::from_waker(&Waker::noop()))
2696        });
2697
2698        Ok(match poll {
2699            Poll::Ready(None) => unreachable!(),
2700            Poll::Ready(Some(result)) => {
2701                // It finished immediately; lower the result and delete the
2702                // task.
2703                lower(store.as_context_mut(), result?)?;
2704                log::trace!("delete host task {task:?} (already ready)");
2705                store.0.concurrent_state_mut().delete(task)?;
2706                None
2707            }
2708            Poll::Pending => {
2709                // It hasn't finished yet; add the future to
2710                // `ConcurrentState::futures` so it will be polled by the event
2711                // loop and allocate a waitable handle to return to the guest.
2712
2713                // Wrap the future in a closure responsible for lowering the result into
2714                // the guest's stack and memory, as well as notifying any waiters that
2715                // the task returned.
2716                let future =
2717                    Box::pin(async move {
2718                        let result = match future.await {
2719                            Some(result) => result?,
2720                            // Task was cancelled; nothing left to do.
2721                            None => return Ok(()),
2722                        };
2723                        tls::get(move |store| {
2724                            // Here we schedule a task to run on a worker fiber to do
2725                            // the lowering since it may involve a call to the guest's
2726                            // realloc function.  This is necessary because calling the
2727                            // guest while there are host embedder frames on the stack
2728                            // is unsound.
2729                            store.concurrent_state_mut().push_high_priority(
2730                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2731                                    lower(token.as_context_mut(store), result)?;
2732                                    let state = store.concurrent_state_mut();
2733                                    state.get_mut(task)?.join_handle.take();
2734                                    Waitable::Host(task).set_event(
2735                                        state,
2736                                        Some(Event::Subtask {
2737                                            status: Status::Returned,
2738                                        }),
2739                                    )
2740                                }))),
2741                            );
2742                            Ok(())
2743                        })
2744                    });
2745
2746                store.0.concurrent_state_mut().push_future(future);
2747                let handle = store
2748                    .0
2749                    .handle_table(RuntimeInstance {
2750                        instance: self.id().instance(),
2751                        index: caller_instance,
2752                    })
2753                    .subtask_insert_host(task.rep())?;
2754                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2755                log::trace!(
2756                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2757                );
2758                Some(handle)
2759            }
2760        })
2761    }
2762
2763    /// Implements the `task.return` intrinsic, lifting the result for the
2764    /// current guest task.
2765    pub(crate) fn task_return(
2766        self,
2767        store: &mut dyn VMStore,
2768        caller: RuntimeComponentInstanceIndex,
2769        ty: TypeTupleIndex,
2770        options: OptionsIndex,
2771        storage: &[ValRaw],
2772    ) -> Result<()> {
2773        self.id().get(store).check_may_leave(caller)?;
2774        let state = store.concurrent_state_mut();
2775        let guest_thread = state.guest_thread.unwrap();
2776        let lift = state
2777            .get_mut(guest_thread.task)?
2778            .lift_result
2779            .take()
2780            .ok_or_else(|| {
2781                format_err!("`task.return` or `task.cancel` called more than once for current task")
2782            })?;
2783        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2784
2785        let CanonicalOptions {
2786            string_encoding,
2787            data_model,
2788            ..
2789        } = &self.id().get(store).component().env_component().options[options];
2790
2791        let invalid = ty != lift.ty
2792            || string_encoding != &lift.string_encoding
2793            || match data_model {
2794                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2795                    Some(memory) => {
2796                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2797                        let actual = self.id().get(store).runtime_memory(memory);
2798                        expected != actual.as_ptr()
2799                    }
2800                    // Memory not specified, meaning it didn't need to be
2801                    // specified per validation, so not invalid.
2802                    None => false,
2803                },
2804                // Always invalid as this isn't supported.
2805                CanonicalOptionsDataModel::Gc { .. } => true,
2806            };
2807
2808        if invalid {
2809            bail!("invalid `task.return` signature and/or options for current task");
2810        }
2811
2812        log::trace!("task.return for {guest_thread:?}");
2813
2814        let result = (lift.lift)(store, storage)?;
2815        self.task_complete(
2816            store,
2817            guest_thread.task,
2818            result,
2819            Status::Returned,
2820            ValRaw::i32(0),
2821        )
2822    }
2823
2824    /// Implements the `task.cancel` intrinsic.
2825    pub(crate) fn task_cancel(
2826        self,
2827        store: &mut StoreOpaque,
2828        caller: RuntimeComponentInstanceIndex,
2829    ) -> Result<()> {
2830        self.id().get(store).check_may_leave(caller)?;
2831        let state = store.concurrent_state_mut();
2832        let guest_thread = state.guest_thread.unwrap();
2833        let task = state.get_mut(guest_thread.task)?;
2834        if !task.cancel_sent {
2835            bail!("`task.cancel` called by task which has not been cancelled")
2836        }
2837        _ = task.lift_result.take().ok_or_else(|| {
2838            format_err!("`task.return` or `task.cancel` called more than once for current task")
2839        })?;
2840
2841        assert!(task.result.is_none());
2842
2843        log::trace!("task.cancel for {guest_thread:?}");
2844
2845        self.task_complete(
2846            store,
2847            guest_thread.task,
2848            Box::new(DummyResult),
2849            Status::ReturnCancelled,
2850            ValRaw::i32(0),
2851        )
2852    }
2853
2854    /// Complete the specified guest task (i.e. indicate that it has either
2855    /// returned a (possibly empty) result or cancelled itself).
2856    ///
2857    /// This will return any resource borrows and notify any current or future
2858    /// waiters that the task has completed.
2859    fn task_complete(
2860        self,
2861        store: &mut StoreOpaque,
2862        guest_task: TableId<GuestTask>,
2863        result: Box<dyn Any + Send + Sync>,
2864        status: Status,
2865        post_return_arg: ValRaw,
2866    ) -> Result<()> {
2867        if store
2868            .concurrent_state_mut()
2869            .get_mut(guest_task)?
2870            .call_post_return_automatically()
2871        {
2872            let (calls, host_table, _, instance) =
2873                store.component_resource_state_with_instance(self);
2874            ResourceTables {
2875                calls,
2876                host_table: Some(host_table),
2877                guest: Some(instance.instance_states()),
2878            }
2879            .exit_call()?;
2880        } else {
2881            // As of this writing, the only scenario where `call_post_return_automatically`
2882            // would be false for a `GuestTask` is for host-to-guest calls using
2883            // `[Typed]Func::call_async`, in which case the `function_index`
2884            // should be a non-`None` value.
2885            let function_index = store
2886                .concurrent_state_mut()
2887                .get_mut(guest_task)?
2888                .function_index
2889                .unwrap();
2890            self.id()
2891                .get_mut(store)
2892                .post_return_arg_set(function_index, post_return_arg);
2893        }
2894
2895        let state = store.concurrent_state_mut();
2896        let task = state.get_mut(guest_task)?;
2897
2898        if let Caller::Host { tx, .. } = &mut task.caller {
2899            if let Some(tx) = tx.take() {
2900                _ = tx.send(result);
2901            }
2902        } else {
2903            task.result = Some(result);
2904            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2905        }
2906
2907        Ok(())
2908    }
2909
2910    /// Implements the `waitable-set.new` intrinsic.
2911    pub(crate) fn waitable_set_new(
2912        self,
2913        store: &mut StoreOpaque,
2914        caller_instance: RuntimeComponentInstanceIndex,
2915    ) -> Result<u32> {
2916        self.id().get_mut(store).check_may_leave(caller_instance)?;
2917        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2918        let handle = store
2919            .handle_table(RuntimeInstance {
2920                instance: self.id().instance(),
2921                index: caller_instance,
2922            })
2923            .waitable_set_insert(set.rep())?;
2924        log::trace!("new waitable set {set:?} (handle {handle})");
2925        Ok(handle)
2926    }
2927
2928    /// Implements the `waitable-set.drop` intrinsic.
2929    pub(crate) fn waitable_set_drop(
2930        self,
2931        store: &mut StoreOpaque,
2932        caller_instance: RuntimeComponentInstanceIndex,
2933        set: u32,
2934    ) -> Result<()> {
2935        self.id().get_mut(store).check_may_leave(caller_instance)?;
2936        let rep = store
2937            .handle_table(RuntimeInstance {
2938                instance: self.id().instance(),
2939                index: caller_instance,
2940            })
2941            .waitable_set_remove(set)?;
2942
2943        log::trace!("drop waitable set {rep} (handle {set})");
2944
2945        let set = store
2946            .concurrent_state_mut()
2947            .delete(TableId::<WaitableSet>::new(rep))?;
2948
2949        if !set.waiting.is_empty() {
2950            bail!("cannot drop waitable set with waiters");
2951        }
2952
2953        Ok(())
2954    }
2955
2956    /// Implements the `waitable.join` intrinsic.
2957    pub(crate) fn waitable_join(
2958        self,
2959        store: &mut StoreOpaque,
2960        caller_instance: RuntimeComponentInstanceIndex,
2961        waitable_handle: u32,
2962        set_handle: u32,
2963    ) -> Result<()> {
2964        let mut instance = self.id().get_mut(store);
2965        instance.check_may_leave(caller_instance)?;
2966        let waitable =
2967            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2968
2969        let set = if set_handle == 0 {
2970            None
2971        } else {
2972            let set = instance.instance_states().0[caller_instance]
2973                .handle_table()
2974                .waitable_set_rep(set_handle)?;
2975
2976            Some(TableId::<WaitableSet>::new(set))
2977        };
2978
2979        log::trace!(
2980            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2981        );
2982
2983        waitable.join(store.concurrent_state_mut(), set)
2984    }
2985
2986    /// Implements the `subtask.drop` intrinsic.
2987    pub(crate) fn subtask_drop(
2988        self,
2989        store: &mut StoreOpaque,
2990        caller_instance: RuntimeComponentInstanceIndex,
2991        task_id: u32,
2992    ) -> Result<()> {
2993        self.id().get_mut(store).check_may_leave(caller_instance)?;
2994        self.waitable_join(store, caller_instance, task_id, 0)?;
2995
2996        let (rep, is_host) = store
2997            .handle_table(RuntimeInstance {
2998                instance: self.id().instance(),
2999                index: caller_instance,
3000            })
3001            .subtask_remove(task_id)?;
3002
3003        let concurrent_state = store.concurrent_state_mut();
3004        let (waitable, expected_caller_instance, delete) = if is_host {
3005            let id = TableId::<HostTask>::new(rep);
3006            let task = concurrent_state.get_mut(id)?;
3007            if task.join_handle.is_some() {
3008                bail!("cannot drop a subtask which has not yet resolved");
3009            }
3010            (Waitable::Host(id), task.caller_instance, true)
3011        } else {
3012            let id = TableId::<GuestTask>::new(rep);
3013            let task = concurrent_state.get_mut(id)?;
3014            if task.lift_result.is_some() {
3015                bail!("cannot drop a subtask which has not yet resolved");
3016            }
3017            if let Caller::Guest { instance, .. } = &task.caller {
3018                (Waitable::Guest(id), *instance, task.exited)
3019            } else {
3020                unreachable!()
3021            }
3022        };
3023
3024        waitable.common(concurrent_state)?.handle = None;
3025
3026        if waitable.take_event(concurrent_state)?.is_some() {
3027            bail!("cannot drop a subtask with an undelivered event");
3028        }
3029
3030        if delete {
3031            waitable.delete_from(concurrent_state)?;
3032        }
3033
3034        // Since waitables can neither be passed between instances nor forged,
3035        // this should never fail unless there's a bug in Wasmtime, but we check
3036        // here to be sure:
3037        assert_eq!(expected_caller_instance, caller_instance);
3038        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3039        Ok(())
3040    }
3041
3042    /// Implements the `waitable-set.wait` intrinsic.
3043    pub(crate) fn waitable_set_wait(
3044        self,
3045        store: &mut StoreOpaque,
3046        caller: RuntimeComponentInstanceIndex,
3047        options: OptionsIndex,
3048        set: u32,
3049        payload: u32,
3050    ) -> Result<u32> {
3051        self.id().get(store).check_may_leave(caller)?;
3052
3053        if !self.options(store, options).async_ {
3054            // The caller may only call `waitable-set.wait` from an async task
3055            // (i.e. a task created via a call to an async export).
3056            // Otherwise, we'll trap.
3057            store.check_blocking()?;
3058        }
3059
3060        let &CanonicalOptions {
3061            cancellable,
3062            instance: caller_instance,
3063            ..
3064        } = &self.id().get(store).component().env_component().options[options];
3065        let rep = store
3066            .handle_table(RuntimeInstance {
3067                instance: self.id().instance(),
3068                index: caller_instance,
3069            })
3070            .waitable_set_rep(set)?;
3071
3072        self.waitable_check(
3073            store,
3074            cancellable,
3075            WaitableCheck::Wait,
3076            WaitableCheckParams {
3077                set: TableId::new(rep),
3078                options,
3079                payload,
3080            },
3081        )
3082    }
3083
3084    /// Implements the `waitable-set.poll` intrinsic.
3085    pub(crate) fn waitable_set_poll(
3086        self,
3087        store: &mut StoreOpaque,
3088        caller: RuntimeComponentInstanceIndex,
3089        options: OptionsIndex,
3090        set: u32,
3091        payload: u32,
3092    ) -> Result<u32> {
3093        self.id().get(store).check_may_leave(caller)?;
3094
3095        let &CanonicalOptions {
3096            cancellable,
3097            instance: caller_instance,
3098            ..
3099        } = &self.id().get(store).component().env_component().options[options];
3100        let rep = store
3101            .handle_table(RuntimeInstance {
3102                instance: self.id().instance(),
3103                index: caller_instance,
3104            })
3105            .waitable_set_rep(set)?;
3106
3107        self.waitable_check(
3108            store,
3109            cancellable,
3110            WaitableCheck::Poll,
3111            WaitableCheckParams {
3112                set: TableId::new(rep),
3113                options,
3114                payload,
3115            },
3116        )
3117    }
3118
3119    /// Implements the `thread.index` intrinsic.
3120    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3121        let thread_id = store
3122            .concurrent_state_mut()
3123            .guest_thread
3124            .ok_or_else(|| format_err!("no current thread"))?
3125            .thread;
3126        // The unwrap is safe because `instance_rep` must be `Some` by this point
3127        Ok(store
3128            .concurrent_state_mut()
3129            .get_mut(thread_id)?
3130            .instance_rep
3131            .unwrap())
3132    }
3133
3134    /// Implements the `thread.new-indirect` intrinsic.
3135    pub(crate) fn thread_new_indirect<T: 'static>(
3136        self,
3137        mut store: StoreContextMut<T>,
3138        runtime_instance: RuntimeComponentInstanceIndex,
3139        _func_ty_idx: TypeFuncIndex, // currently unused
3140        start_func_table_idx: RuntimeTableIndex,
3141        start_func_idx: u32,
3142        context: i32,
3143    ) -> Result<u32> {
3144        self.id().get(store.0).check_may_leave(runtime_instance)?;
3145
3146        log::trace!("creating new thread");
3147
3148        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3149        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3150        let callee = instance
3151            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3152            .ok_or_else(|| {
3153                format_err!("the start function index points to an uninitialized function")
3154            })?;
3155        if callee.type_index(store.0) != start_func_ty.type_index() {
3156            bail!(
3157                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3158            );
3159        }
3160
3161        let token = StoreToken::new(store.as_context_mut());
3162        let start_func = Box::new(
3163            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3164                let old_thread = store.set_thread(Some(guest_thread));
3165                log::trace!(
3166                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3167                );
3168
3169                store.maybe_push_call_context(guest_thread.task)?;
3170
3171                let mut store = token.as_context_mut(store);
3172                let mut params = [ValRaw::i32(context)];
3173                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3174                // on a separate fiber if we're running in an async store.
3175                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3176
3177                store.0.maybe_pop_call_context(guest_thread.task)?;
3178
3179                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3180                log::trace!("explicit thread {guest_thread:?} completed");
3181                let state = store.0.concurrent_state_mut();
3182                let task = state.get_mut(guest_thread.task)?;
3183                if task.threads.is_empty() && !task.returned_or_cancelled() {
3184                    bail!(Trap::NoAsyncResult);
3185                }
3186                store.0.set_thread(old_thread);
3187                let state = store.0.concurrent_state_mut();
3188                old_thread
3189                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3190                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3191                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3192                }
3193                log::trace!("thread start: restored {old_thread:?} as current thread");
3194
3195                Ok(())
3196            },
3197        );
3198
3199        let state = store.0.concurrent_state_mut();
3200        let current_thread = state.guest_thread.unwrap();
3201        let parent_task = current_thread.task;
3202
3203        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3204        let thread_id = state.push(new_thread)?;
3205        state.get_mut(parent_task)?.threads.insert(thread_id);
3206
3207        log::trace!("new thread with id {thread_id:?} created");
3208
3209        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3210    }
3211
3212    pub(crate) fn resume_suspended_thread(
3213        self,
3214        store: &mut StoreOpaque,
3215        runtime_instance: RuntimeComponentInstanceIndex,
3216        thread_idx: u32,
3217        high_priority: bool,
3218    ) -> Result<()> {
3219        let thread_id =
3220            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3221        let state = store.concurrent_state_mut();
3222        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3223        let thread = state.get_mut(guest_thread.thread)?;
3224
3225        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3226            GuestThreadState::NotStartedExplicit(start_func) => {
3227                log::trace!("starting thread {guest_thread:?}");
3228                let guest_call = WorkItem::GuestCall(GuestCall {
3229                    thread: guest_thread,
3230                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3231                        start_func(store, guest_thread)
3232                    })),
3233                });
3234                store
3235                    .concurrent_state_mut()
3236                    .push_work_item(guest_call, high_priority);
3237            }
3238            GuestThreadState::Suspended(fiber) => {
3239                log::trace!("resuming thread {thread_id:?} that was suspended");
3240                store
3241                    .concurrent_state_mut()
3242                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3243            }
3244            _ => {
3245                bail!("cannot resume thread which is not suspended");
3246            }
3247        }
3248        Ok(())
3249    }
3250
3251    fn add_guest_thread_to_instance_table(
3252        self,
3253        thread_id: TableId<GuestThread>,
3254        store: &mut StoreOpaque,
3255        runtime_instance: RuntimeComponentInstanceIndex,
3256    ) -> Result<u32> {
3257        let guest_id = store
3258            .handle_table(RuntimeInstance {
3259                instance: self.id().instance(),
3260                index: runtime_instance,
3261            })
3262            .guest_thread_insert(thread_id.rep())?;
3263        store
3264            .concurrent_state_mut()
3265            .get_mut(thread_id)?
3266            .instance_rep = Some(guest_id);
3267        Ok(guest_id)
3268    }
3269
3270    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3271    /// and `thread.switch-to` intrinsics.
3272    pub(crate) fn suspension_intrinsic(
3273        self,
3274        store: &mut StoreOpaque,
3275        caller: RuntimeComponentInstanceIndex,
3276        cancellable: bool,
3277        yielding: bool,
3278        to_thread: Option<u32>,
3279    ) -> Result<WaitResult> {
3280        self.id().get(store).check_may_leave(caller)?;
3281
3282        if to_thread.is_none() {
3283            let state = store.concurrent_state_mut();
3284            if yielding {
3285                // This is a `thread.yield` call
3286                if !state.may_block(state.guest_thread.unwrap().task) {
3287                    // The spec defines `thread.yield` to be a no-op in a
3288                    // non-blocking context, so we return immediately without giving
3289                    // any other thread a chance to run.
3290                    return Ok(WaitResult::Completed);
3291                }
3292            } else {
3293                // The caller may only call `thread.suspend` from an async task
3294                // (i.e. a task created via a call to an async export).
3295                // Otherwise, we'll trap.
3296                store.check_blocking()?;
3297            }
3298        }
3299
3300        // There could be a pending cancellation from a previous uncancellable wait
3301        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3302            return Ok(WaitResult::Cancelled);
3303        }
3304
3305        if let Some(thread) = to_thread {
3306            self.resume_suspended_thread(store, caller, thread, true)?;
3307        }
3308
3309        let state = store.concurrent_state_mut();
3310        let guest_thread = state.guest_thread.unwrap();
3311        let reason = if yielding {
3312            SuspendReason::Yielding {
3313                thread: guest_thread,
3314                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3315                // we're handling a `thread.yield-to` call; otherwise it would
3316                // panic if we called it in a non-blocking context.
3317                skip_may_block_check: to_thread.is_some(),
3318            }
3319        } else {
3320            SuspendReason::ExplicitlySuspending {
3321                thread: guest_thread,
3322                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3323                // we're handling a `thread.switch-to` call; otherwise it would
3324                // panic if we called it in a non-blocking context.
3325                skip_may_block_check: to_thread.is_some(),
3326            }
3327        };
3328
3329        store.suspend(reason)?;
3330
3331        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3332            Ok(WaitResult::Cancelled)
3333        } else {
3334            Ok(WaitResult::Completed)
3335        }
3336    }
3337
3338    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3339    fn waitable_check(
3340        self,
3341        store: &mut StoreOpaque,
3342        cancellable: bool,
3343        check: WaitableCheck,
3344        params: WaitableCheckParams,
3345    ) -> Result<u32> {
3346        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3347
3348        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3349
3350        let state = store.concurrent_state_mut();
3351        let task = state.get_mut(guest_thread.task)?;
3352
3353        // If we're waiting, and there are no events immediately available,
3354        // suspend the fiber until that changes.
3355        match &check {
3356            WaitableCheck::Wait => {
3357                let set = params.set;
3358
3359                if (task.event.is_none()
3360                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3361                    && state.get_mut(set)?.ready.is_empty()
3362                {
3363                    if cancellable {
3364                        let old = state
3365                            .get_mut(guest_thread.thread)?
3366                            .wake_on_cancel
3367                            .replace(set);
3368                        assert!(old.is_none());
3369                    }
3370
3371                    store.suspend(SuspendReason::Waiting {
3372                        set,
3373                        thread: guest_thread,
3374                        skip_may_block_check: false,
3375                    })?;
3376                }
3377            }
3378            WaitableCheck::Poll => {}
3379        }
3380
3381        log::trace!(
3382            "waitable check for {guest_thread:?}; set {:?}, part two",
3383            params.set
3384        );
3385
3386        // Deliver any pending events to the guest and return.
3387        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3388
3389        let (ordinal, handle, result) = match &check {
3390            WaitableCheck::Wait => {
3391                let (event, waitable) = event.unwrap();
3392                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3393                let (ordinal, result) = event.parts();
3394                (ordinal, handle, result)
3395            }
3396            WaitableCheck::Poll => {
3397                if let Some((event, waitable)) = event {
3398                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3399                    let (ordinal, result) = event.parts();
3400                    (ordinal, handle, result)
3401                } else {
3402                    log::trace!(
3403                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3404                        guest_thread.task,
3405                        params.set
3406                    );
3407                    let (ordinal, result) = Event::None.parts();
3408                    (ordinal, 0, result)
3409                }
3410            }
3411        };
3412        let memory = self.options_memory_mut(store, params.options);
3413        let ptr = func::validate_inbounds_dynamic(
3414            &CanonicalAbiInfo::POINTER_PAIR,
3415            memory,
3416            &ValRaw::u32(params.payload),
3417        )?;
3418        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3419        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3420        Ok(ordinal)
3421    }
3422
3423    /// Implements the `subtask.cancel` intrinsic.
3424    pub(crate) fn subtask_cancel(
3425        self,
3426        store: &mut StoreOpaque,
3427        caller_instance: RuntimeComponentInstanceIndex,
3428        async_: bool,
3429        task_id: u32,
3430    ) -> Result<u32> {
3431        self.id().get(store).check_may_leave(caller_instance)?;
3432
3433        if !async_ {
3434            // The caller may only sync call `subtask.cancel` from an async task
3435            // (i.e. a task created via a call to an async export).  Otherwise,
3436            // we'll trap.
3437            store.check_blocking()?;
3438        }
3439
3440        let (rep, is_host) = store
3441            .handle_table(RuntimeInstance {
3442                instance: self.id().instance(),
3443                index: caller_instance,
3444            })
3445            .subtask_rep(task_id)?;
3446        let (waitable, expected_caller_instance) = if is_host {
3447            let id = TableId::<HostTask>::new(rep);
3448            (
3449                Waitable::Host(id),
3450                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3451            )
3452        } else {
3453            let id = TableId::<GuestTask>::new(rep);
3454            if let Caller::Guest { instance, .. } =
3455                &store.concurrent_state_mut().get_mut(id)?.caller
3456            {
3457                (Waitable::Guest(id), *instance)
3458            } else {
3459                unreachable!()
3460            }
3461        };
3462        // Since waitables can neither be passed between instances nor forged,
3463        // this should never fail unless there's a bug in Wasmtime, but we check
3464        // here to be sure:
3465        assert_eq!(expected_caller_instance, caller_instance);
3466
3467        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3468
3469        let concurrent_state = store.concurrent_state_mut();
3470        if let Waitable::Host(host_task) = waitable {
3471            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3472                handle.abort();
3473                return Ok(Status::ReturnCancelled as u32);
3474            }
3475        } else {
3476            let caller = concurrent_state.guest_thread.unwrap();
3477            let guest_task = TableId::<GuestTask>::new(rep);
3478            let task = concurrent_state.get_mut(guest_task)?;
3479            if !task.already_lowered_parameters() {
3480                task.lower_params = None;
3481                task.lift_result = None;
3482
3483                // Not yet started; cancel and remove from pending
3484                let instance = task.instance;
3485                let pending = &mut store.instance_state(instance).pending;
3486                let pending_count = pending.len();
3487                pending.retain(|thread, _| thread.task != guest_task);
3488                // If there were no pending threads for this task, we're in an error state
3489                if pending.len() == pending_count {
3490                    bail!("`subtask.cancel` called after terminal status delivered");
3491                }
3492                return Ok(Status::StartCancelled as u32);
3493            } else if !task.returned_or_cancelled() {
3494                // Started, but not yet returned or cancelled; send the
3495                // `CANCELLED` event
3496                task.cancel_sent = true;
3497                // Note that this might overwrite an event that was set earlier
3498                // (e.g. `Event::None` if the task is yielding, or
3499                // `Event::Cancelled` if it was already cancelled), but that's
3500                // okay -- this should supersede the previous state.
3501                task.event = Some(Event::Cancelled);
3502                for thread in task.threads.clone() {
3503                    let thread = QualifiedThreadId {
3504                        task: guest_task,
3505                        thread,
3506                    };
3507                    if let Some(set) = concurrent_state
3508                        .get_mut(thread.thread)
3509                        .unwrap()
3510                        .wake_on_cancel
3511                        .take()
3512                    {
3513                        let item = match concurrent_state
3514                            .get_mut(set)?
3515                            .waiting
3516                            .remove(&thread)
3517                            .unwrap()
3518                        {
3519                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3520                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3521                                thread,
3522                                kind: GuestCallKind::DeliverEvent {
3523                                    instance,
3524                                    set: None,
3525                                },
3526                            }),
3527                        };
3528                        concurrent_state.push_high_priority(item);
3529
3530                        store.suspend(SuspendReason::Yielding {
3531                            thread: caller,
3532                            // `subtask.cancel` is not allowed to be called in a
3533                            // sync context, so we cannot skip the may-block check.
3534                            skip_may_block_check: false,
3535                        })?;
3536                        break;
3537                    }
3538                }
3539
3540                let concurrent_state = store.concurrent_state_mut();
3541                let task = concurrent_state.get_mut(guest_task)?;
3542                if !task.returned_or_cancelled() {
3543                    if async_ {
3544                        return Ok(BLOCKED);
3545                    } else {
3546                        store.wait_for_event(Waitable::Guest(guest_task))?;
3547                    }
3548                }
3549            }
3550        }
3551
3552        let event = waitable.take_event(store.concurrent_state_mut())?;
3553        if let Some(Event::Subtask {
3554            status: status @ (Status::Returned | Status::ReturnCancelled),
3555        }) = event
3556        {
3557            Ok(status as u32)
3558        } else {
3559            bail!("`subtask.cancel` called after terminal status delivered");
3560        }
3561    }
3562
3563    pub(crate) fn context_get(
3564        self,
3565        store: &mut StoreOpaque,
3566        caller: RuntimeComponentInstanceIndex,
3567        slot: u32,
3568    ) -> Result<u32> {
3569        self.id().get(store).check_may_leave(caller)?;
3570        store.concurrent_state_mut().context_get(slot)
3571    }
3572
3573    pub(crate) fn context_set(
3574        self,
3575        store: &mut StoreOpaque,
3576        caller: RuntimeComponentInstanceIndex,
3577        slot: u32,
3578        value: u32,
3579    ) -> Result<()> {
3580        self.id().get(store).check_may_leave(caller)?;
3581        store.concurrent_state_mut().context_set(slot, value)
3582    }
3583}
3584
3585/// Trait representing component model ABI async intrinsics and fused adapter
3586/// helper functions.
3587///
3588/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3589/// which must be valid for at least the duration of the call (and possibly for
3590/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3591/// pointers used for async calls).
3592pub trait VMComponentAsyncStore {
3593    /// A helper function for fused adapter modules involving calls where the
3594    /// one of the caller or callee is async.
3595    ///
3596    /// This helper is not used when the caller and callee both use the sync
3597    /// ABI, only when at least one is async is this used.
3598    unsafe fn prepare_call(
3599        &mut self,
3600        instance: Instance,
3601        memory: *mut VMMemoryDefinition,
3602        start: *mut VMFuncRef,
3603        return_: *mut VMFuncRef,
3604        caller_instance: RuntimeComponentInstanceIndex,
3605        callee_instance: RuntimeComponentInstanceIndex,
3606        task_return_type: TypeTupleIndex,
3607        callee_async: bool,
3608        string_encoding: u8,
3609        result_count: u32,
3610        storage: *mut ValRaw,
3611        storage_len: usize,
3612    ) -> Result<()>;
3613
3614    /// A helper function for fused adapter modules involving calls where the
3615    /// caller is sync-lowered but the callee is async-lifted.
3616    unsafe fn sync_start(
3617        &mut self,
3618        instance: Instance,
3619        callback: *mut VMFuncRef,
3620        callee: *mut VMFuncRef,
3621        param_count: u32,
3622        storage: *mut MaybeUninit<ValRaw>,
3623        storage_len: usize,
3624    ) -> Result<()>;
3625
3626    /// A helper function for fused adapter modules involving calls where the
3627    /// caller is async-lowered.
3628    unsafe fn async_start(
3629        &mut self,
3630        instance: Instance,
3631        callback: *mut VMFuncRef,
3632        post_return: *mut VMFuncRef,
3633        callee: *mut VMFuncRef,
3634        param_count: u32,
3635        result_count: u32,
3636        flags: u32,
3637    ) -> Result<u32>;
3638
3639    /// The `future.write` intrinsic.
3640    fn future_write(
3641        &mut self,
3642        instance: Instance,
3643        caller: RuntimeComponentInstanceIndex,
3644        ty: TypeFutureTableIndex,
3645        options: OptionsIndex,
3646        future: u32,
3647        address: u32,
3648    ) -> Result<u32>;
3649
3650    /// The `future.read` intrinsic.
3651    fn future_read(
3652        &mut self,
3653        instance: Instance,
3654        caller: RuntimeComponentInstanceIndex,
3655        ty: TypeFutureTableIndex,
3656        options: OptionsIndex,
3657        future: u32,
3658        address: u32,
3659    ) -> Result<u32>;
3660
3661    /// The `future.drop-writable` intrinsic.
3662    fn future_drop_writable(
3663        &mut self,
3664        instance: Instance,
3665        caller: RuntimeComponentInstanceIndex,
3666        ty: TypeFutureTableIndex,
3667        writer: u32,
3668    ) -> Result<()>;
3669
3670    /// The `stream.write` intrinsic.
3671    fn stream_write(
3672        &mut self,
3673        instance: Instance,
3674        caller: RuntimeComponentInstanceIndex,
3675        ty: TypeStreamTableIndex,
3676        options: OptionsIndex,
3677        stream: u32,
3678        address: u32,
3679        count: u32,
3680    ) -> Result<u32>;
3681
3682    /// The `stream.read` intrinsic.
3683    fn stream_read(
3684        &mut self,
3685        instance: Instance,
3686        caller: RuntimeComponentInstanceIndex,
3687        ty: TypeStreamTableIndex,
3688        options: OptionsIndex,
3689        stream: u32,
3690        address: u32,
3691        count: u32,
3692    ) -> Result<u32>;
3693
3694    /// The "fast-path" implementation of the `stream.write` intrinsic for
3695    /// "flat" (i.e. memcpy-able) payloads.
3696    fn flat_stream_write(
3697        &mut self,
3698        instance: Instance,
3699        caller: RuntimeComponentInstanceIndex,
3700        ty: TypeStreamTableIndex,
3701        options: OptionsIndex,
3702        payload_size: u32,
3703        payload_align: u32,
3704        stream: u32,
3705        address: u32,
3706        count: u32,
3707    ) -> Result<u32>;
3708
3709    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3710    /// (i.e. memcpy-able) payloads.
3711    fn flat_stream_read(
3712        &mut self,
3713        instance: Instance,
3714        caller: RuntimeComponentInstanceIndex,
3715        ty: TypeStreamTableIndex,
3716        options: OptionsIndex,
3717        payload_size: u32,
3718        payload_align: u32,
3719        stream: u32,
3720        address: u32,
3721        count: u32,
3722    ) -> Result<u32>;
3723
3724    /// The `stream.drop-writable` intrinsic.
3725    fn stream_drop_writable(
3726        &mut self,
3727        instance: Instance,
3728        caller: RuntimeComponentInstanceIndex,
3729        ty: TypeStreamTableIndex,
3730        writer: u32,
3731    ) -> Result<()>;
3732
3733    /// The `error-context.debug-message` intrinsic.
3734    fn error_context_debug_message(
3735        &mut self,
3736        instance: Instance,
3737        caller: RuntimeComponentInstanceIndex,
3738        ty: TypeComponentLocalErrorContextTableIndex,
3739        options: OptionsIndex,
3740        err_ctx_handle: u32,
3741        debug_msg_address: u32,
3742    ) -> Result<()>;
3743
3744    /// The `thread.new-indirect` intrinsic
3745    fn thread_new_indirect(
3746        &mut self,
3747        instance: Instance,
3748        caller: RuntimeComponentInstanceIndex,
3749        func_ty_idx: TypeFuncIndex,
3750        start_func_table_idx: RuntimeTableIndex,
3751        start_func_idx: u32,
3752        context: i32,
3753    ) -> Result<u32>;
3754}
3755
3756/// SAFETY: See trait docs.
3757impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3758    unsafe fn prepare_call(
3759        &mut self,
3760        instance: Instance,
3761        memory: *mut VMMemoryDefinition,
3762        start: *mut VMFuncRef,
3763        return_: *mut VMFuncRef,
3764        caller_instance: RuntimeComponentInstanceIndex,
3765        callee_instance: RuntimeComponentInstanceIndex,
3766        task_return_type: TypeTupleIndex,
3767        callee_async: bool,
3768        string_encoding: u8,
3769        result_count_or_max_if_async: u32,
3770        storage: *mut ValRaw,
3771        storage_len: usize,
3772    ) -> Result<()> {
3773        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3774        // this method will have ensured that `storage` is a valid
3775        // pointer containing at least `storage_len` items.
3776        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3777
3778        unsafe {
3779            instance.prepare_call(
3780                StoreContextMut(self),
3781                start,
3782                return_,
3783                caller_instance,
3784                callee_instance,
3785                task_return_type,
3786                callee_async,
3787                memory,
3788                string_encoding,
3789                match result_count_or_max_if_async {
3790                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3791                        params,
3792                        has_result: false,
3793                    },
3794                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3795                        params,
3796                        has_result: true,
3797                    },
3798                    result_count => CallerInfo::Sync {
3799                        params,
3800                        result_count,
3801                    },
3802                },
3803            )
3804        }
3805    }
3806
3807    unsafe fn sync_start(
3808        &mut self,
3809        instance: Instance,
3810        callback: *mut VMFuncRef,
3811        callee: *mut VMFuncRef,
3812        param_count: u32,
3813        storage: *mut MaybeUninit<ValRaw>,
3814        storage_len: usize,
3815    ) -> Result<()> {
3816        unsafe {
3817            instance
3818                .start_call(
3819                    StoreContextMut(self),
3820                    callback,
3821                    ptr::null_mut(),
3822                    callee,
3823                    param_count,
3824                    1,
3825                    START_FLAG_ASYNC_CALLEE,
3826                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3827                    // this method will have ensured that `storage` is a valid
3828                    // pointer containing at least `storage_len` items.
3829                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3830                )
3831                .map(drop)
3832        }
3833    }
3834
3835    unsafe fn async_start(
3836        &mut self,
3837        instance: Instance,
3838        callback: *mut VMFuncRef,
3839        post_return: *mut VMFuncRef,
3840        callee: *mut VMFuncRef,
3841        param_count: u32,
3842        result_count: u32,
3843        flags: u32,
3844    ) -> Result<u32> {
3845        unsafe {
3846            instance.start_call(
3847                StoreContextMut(self),
3848                callback,
3849                post_return,
3850                callee,
3851                param_count,
3852                result_count,
3853                flags,
3854                None,
3855            )
3856        }
3857    }
3858
3859    fn future_write(
3860        &mut self,
3861        instance: Instance,
3862        caller: RuntimeComponentInstanceIndex,
3863        ty: TypeFutureTableIndex,
3864        options: OptionsIndex,
3865        future: u32,
3866        address: u32,
3867    ) -> Result<u32> {
3868        instance.id().get(self).check_may_leave(caller)?;
3869        instance
3870            .guest_write(
3871                StoreContextMut(self),
3872                caller,
3873                TransmitIndex::Future(ty),
3874                options,
3875                None,
3876                future,
3877                address,
3878                1,
3879            )
3880            .map(|result| result.encode())
3881    }
3882
3883    fn future_read(
3884        &mut self,
3885        instance: Instance,
3886        caller: RuntimeComponentInstanceIndex,
3887        ty: TypeFutureTableIndex,
3888        options: OptionsIndex,
3889        future: u32,
3890        address: u32,
3891    ) -> Result<u32> {
3892        instance.id().get(self).check_may_leave(caller)?;
3893        instance
3894            .guest_read(
3895                StoreContextMut(self),
3896                caller,
3897                TransmitIndex::Future(ty),
3898                options,
3899                None,
3900                future,
3901                address,
3902                1,
3903            )
3904            .map(|result| result.encode())
3905    }
3906
3907    fn stream_write(
3908        &mut self,
3909        instance: Instance,
3910        caller: RuntimeComponentInstanceIndex,
3911        ty: TypeStreamTableIndex,
3912        options: OptionsIndex,
3913        stream: u32,
3914        address: u32,
3915        count: u32,
3916    ) -> Result<u32> {
3917        instance.id().get(self).check_may_leave(caller)?;
3918        instance
3919            .guest_write(
3920                StoreContextMut(self),
3921                caller,
3922                TransmitIndex::Stream(ty),
3923                options,
3924                None,
3925                stream,
3926                address,
3927                count,
3928            )
3929            .map(|result| result.encode())
3930    }
3931
3932    fn stream_read(
3933        &mut self,
3934        instance: Instance,
3935        caller: RuntimeComponentInstanceIndex,
3936        ty: TypeStreamTableIndex,
3937        options: OptionsIndex,
3938        stream: u32,
3939        address: u32,
3940        count: u32,
3941    ) -> Result<u32> {
3942        instance.id().get(self).check_may_leave(caller)?;
3943        instance
3944            .guest_read(
3945                StoreContextMut(self),
3946                caller,
3947                TransmitIndex::Stream(ty),
3948                options,
3949                None,
3950                stream,
3951                address,
3952                count,
3953            )
3954            .map(|result| result.encode())
3955    }
3956
3957    fn future_drop_writable(
3958        &mut self,
3959        instance: Instance,
3960        caller: RuntimeComponentInstanceIndex,
3961        ty: TypeFutureTableIndex,
3962        writer: u32,
3963    ) -> Result<()> {
3964        instance.id().get(self).check_may_leave(caller)?;
3965        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3966    }
3967
3968    fn flat_stream_write(
3969        &mut self,
3970        instance: Instance,
3971        caller: RuntimeComponentInstanceIndex,
3972        ty: TypeStreamTableIndex,
3973        options: OptionsIndex,
3974        payload_size: u32,
3975        payload_align: u32,
3976        stream: u32,
3977        address: u32,
3978        count: u32,
3979    ) -> Result<u32> {
3980        instance.id().get(self).check_may_leave(caller)?;
3981        instance
3982            .guest_write(
3983                StoreContextMut(self),
3984                caller,
3985                TransmitIndex::Stream(ty),
3986                options,
3987                Some(FlatAbi {
3988                    size: payload_size,
3989                    align: payload_align,
3990                }),
3991                stream,
3992                address,
3993                count,
3994            )
3995            .map(|result| result.encode())
3996    }
3997
3998    fn flat_stream_read(
3999        &mut self,
4000        instance: Instance,
4001        caller: RuntimeComponentInstanceIndex,
4002        ty: TypeStreamTableIndex,
4003        options: OptionsIndex,
4004        payload_size: u32,
4005        payload_align: u32,
4006        stream: u32,
4007        address: u32,
4008        count: u32,
4009    ) -> Result<u32> {
4010        instance.id().get(self).check_may_leave(caller)?;
4011        instance
4012            .guest_read(
4013                StoreContextMut(self),
4014                caller,
4015                TransmitIndex::Stream(ty),
4016                options,
4017                Some(FlatAbi {
4018                    size: payload_size,
4019                    align: payload_align,
4020                }),
4021                stream,
4022                address,
4023                count,
4024            )
4025            .map(|result| result.encode())
4026    }
4027
4028    fn stream_drop_writable(
4029        &mut self,
4030        instance: Instance,
4031        caller: RuntimeComponentInstanceIndex,
4032        ty: TypeStreamTableIndex,
4033        writer: u32,
4034    ) -> Result<()> {
4035        instance.id().get(self).check_may_leave(caller)?;
4036        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4037    }
4038
4039    fn error_context_debug_message(
4040        &mut self,
4041        instance: Instance,
4042        caller: RuntimeComponentInstanceIndex,
4043        ty: TypeComponentLocalErrorContextTableIndex,
4044        options: OptionsIndex,
4045        err_ctx_handle: u32,
4046        debug_msg_address: u32,
4047    ) -> Result<()> {
4048        instance.id().get(self).check_may_leave(caller)?;
4049        instance.error_context_debug_message(
4050            StoreContextMut(self),
4051            ty,
4052            options,
4053            err_ctx_handle,
4054            debug_msg_address,
4055        )
4056    }
4057
4058    fn thread_new_indirect(
4059        &mut self,
4060        instance: Instance,
4061        caller: RuntimeComponentInstanceIndex,
4062        func_ty_idx: TypeFuncIndex,
4063        start_func_table_idx: RuntimeTableIndex,
4064        start_func_idx: u32,
4065        context: i32,
4066    ) -> Result<u32> {
4067        instance.thread_new_indirect(
4068            StoreContextMut(self),
4069            caller,
4070            func_ty_idx,
4071            start_func_table_idx,
4072            start_func_idx,
4073            context,
4074        )
4075    }
4076}
4077
4078type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4079
4080/// Represents the state of a pending host task.
4081struct HostTask {
4082    common: WaitableCommon,
4083    caller_instance: RuntimeComponentInstanceIndex,
4084    join_handle: Option<JoinHandle>,
4085}
4086
4087impl HostTask {
4088    fn new(
4089        caller_instance: RuntimeComponentInstanceIndex,
4090        join_handle: Option<JoinHandle>,
4091    ) -> Self {
4092        Self {
4093            common: WaitableCommon::default(),
4094            caller_instance,
4095            join_handle,
4096        }
4097    }
4098}
4099
4100impl TableDebug for HostTask {
4101    fn type_name() -> &'static str {
4102        "HostTask"
4103    }
4104}
4105
4106type CallbackFn = Box<
4107    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4108        + Send
4109        + Sync
4110        + 'static,
4111>;
4112
4113/// Represents the caller of a given guest task.
4114enum Caller {
4115    /// The host called the guest task.
4116    Host {
4117        /// If present, may be used to deliver the result.
4118        tx: Option<oneshot::Sender<LiftedResult>>,
4119        /// Channel to notify once all subtasks spawned by this caller have
4120        /// completed.
4121        ///
4122        /// Note that we'll never actually send anything to this channel;
4123        /// dropping it when the refcount goes to zero is sufficient to notify
4124        /// the receiver.
4125        exit_tx: Arc<oneshot::Sender<()>>,
4126        /// If true, there's a host future that must be dropped before the task
4127        /// can be deleted.
4128        host_future_present: bool,
4129        /// If true, call `post-return` function (if any) automatically.
4130        call_post_return_automatically: bool,
4131    },
4132    /// Another guest thread called the guest task
4133    Guest {
4134        /// The id of the caller
4135        thread: QualifiedThreadId,
4136        /// The instance to use to enforce reentrance rules.
4137        ///
4138        /// Note that this might not be the same as the instance the caller task
4139        /// started executing in given that one or more synchronous guest->guest
4140        /// calls may have occurred involving multiple instances.
4141        instance: RuntimeComponentInstanceIndex,
4142    },
4143}
4144
4145/// Represents a closure and related canonical ABI parameters required to
4146/// validate a `task.return` call at runtime and lift the result.
4147struct LiftResult {
4148    lift: RawLift,
4149    ty: TypeTupleIndex,
4150    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4151    string_encoding: StringEncoding,
4152}
4153
4154/// The table ID for a guest thread, qualified by the task to which it belongs.
4155///
4156/// This exists to minimize table lookups and the necessity to pass stores around mutably
4157/// for the common case of identifying the task to which a thread belongs.
4158#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4159struct QualifiedThreadId {
4160    task: TableId<GuestTask>,
4161    thread: TableId<GuestThread>,
4162}
4163
4164impl QualifiedThreadId {
4165    fn qualify(
4166        state: &mut ConcurrentState,
4167        thread: TableId<GuestThread>,
4168    ) -> Result<QualifiedThreadId> {
4169        Ok(QualifiedThreadId {
4170            task: state.get_mut(thread)?.parent_task,
4171            thread,
4172        })
4173    }
4174}
4175
4176impl fmt::Debug for QualifiedThreadId {
4177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4178        f.debug_tuple("QualifiedThreadId")
4179            .field(&self.task.rep())
4180            .field(&self.thread.rep())
4181            .finish()
4182    }
4183}
4184
4185enum GuestThreadState {
4186    NotStartedImplicit,
4187    NotStartedExplicit(
4188        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4189    ),
4190    Running,
4191    Suspended(StoreFiber<'static>),
4192    Pending,
4193    Completed,
4194}
4195pub struct GuestThread {
4196    /// Context-local state used to implement the `context.{get,set}`
4197    /// intrinsics.
4198    context: [u32; 2],
4199    /// The owning guest task.
4200    parent_task: TableId<GuestTask>,
4201    /// If present, indicates that the thread is currently waiting on the
4202    /// specified set but may be cancelled and woken immediately.
4203    wake_on_cancel: Option<TableId<WaitableSet>>,
4204    /// The execution state of this guest thread
4205    state: GuestThreadState,
4206    /// The index of this thread in the component instance's handle table.
4207    /// This must always be `Some` after initialization.
4208    instance_rep: Option<u32>,
4209}
4210
4211impl GuestThread {
4212    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4213    /// handle.
4214    fn from_instance(
4215        state: Pin<&mut ComponentInstance>,
4216        caller_instance: RuntimeComponentInstanceIndex,
4217        guest_thread: u32,
4218    ) -> Result<TableId<Self>> {
4219        let rep = state.instance_states().0[caller_instance]
4220            .handle_table()
4221            .guest_thread_rep(guest_thread)?;
4222        Ok(TableId::new(rep))
4223    }
4224
4225    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4226        Self {
4227            context: [0; 2],
4228            parent_task,
4229            wake_on_cancel: None,
4230            state: GuestThreadState::NotStartedImplicit,
4231            instance_rep: None,
4232        }
4233    }
4234
4235    fn new_explicit(
4236        parent_task: TableId<GuestTask>,
4237        start_func: Box<
4238            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4239        >,
4240    ) -> Self {
4241        Self {
4242            context: [0; 2],
4243            parent_task,
4244            wake_on_cancel: None,
4245            state: GuestThreadState::NotStartedExplicit(start_func),
4246            instance_rep: None,
4247        }
4248    }
4249}
4250
4251impl TableDebug for GuestThread {
4252    fn type_name() -> &'static str {
4253        "GuestThread"
4254    }
4255}
4256
4257enum SyncResult {
4258    NotProduced,
4259    Produced(Option<ValRaw>),
4260    Taken,
4261}
4262
4263impl SyncResult {
4264    fn take(&mut self) -> Option<Option<ValRaw>> {
4265        match mem::replace(self, SyncResult::Taken) {
4266            SyncResult::NotProduced => None,
4267            SyncResult::Produced(val) => Some(val),
4268            SyncResult::Taken => {
4269                panic!("attempted to take a synchronous result that was already taken")
4270            }
4271        }
4272    }
4273}
4274
4275#[derive(Debug)]
4276enum HostFutureState {
4277    NotApplicable,
4278    Live,
4279    Dropped,
4280}
4281
4282/// Represents a pending guest task.
4283pub(crate) struct GuestTask {
4284    /// See `WaitableCommon`
4285    common: WaitableCommon,
4286    /// Closure to lower the parameters passed to this task.
4287    lower_params: Option<RawLower>,
4288    /// See `LiftResult`
4289    lift_result: Option<LiftResult>,
4290    /// A place to stash the type-erased lifted result if it can't be delivered
4291    /// immediately.
4292    result: Option<LiftedResult>,
4293    /// Closure to call the callback function for an async-lifted export, if
4294    /// provided.
4295    callback: Option<CallbackFn>,
4296    /// See `Caller`
4297    caller: Caller,
4298    /// A place to stash the call context for managing resource borrows while
4299    /// switching between guest tasks.
4300    call_context: Option<CallContext>,
4301    /// A place to stash the lowered result for a sync-to-async call until it
4302    /// can be returned to the caller.
4303    sync_result: SyncResult,
4304    /// Whether or not the task has been cancelled (i.e. whether the task is
4305    /// permitted to call `task.cancel`).
4306    cancel_sent: bool,
4307    /// Whether or not we've sent a `Status::Starting` event to any current or
4308    /// future waiters for this waitable.
4309    starting_sent: bool,
4310    /// Pending guest subtasks created by this task (directly or indirectly).
4311    ///
4312    /// This is used to re-parent subtasks which are still running when their
4313    /// parent task is disposed.
4314    subtasks: HashSet<TableId<GuestTask>>,
4315    /// Scratch waitable set used to watch subtasks during synchronous calls.
4316    sync_call_set: TableId<WaitableSet>,
4317    /// The runtime instance to which the exported function for this guest task
4318    /// belongs.
4319    ///
4320    /// Note that the task may do a sync->sync call via a fused adapter which
4321    /// results in that task executing code in a different instance, and it may
4322    /// call host functions and intrinsics from that other instance.
4323    instance: RuntimeInstance,
4324    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4325    /// delivered to this task.
4326    event: Option<Event>,
4327    /// The `ExportIndex` of the guest function being called, if known.
4328    function_index: Option<ExportIndex>,
4329    /// Whether or not the task has exited.
4330    exited: bool,
4331    /// Threads belonging to this task
4332    threads: HashSet<TableId<GuestThread>>,
4333    /// The state of the host future that represents an async task, which must
4334    /// be dropped before we can delete the task.
4335    host_future_state: HostFutureState,
4336    /// Indicates whether this task was created for a call to an async-lifted
4337    /// export.
4338    async_function: bool,
4339}
4340
4341impl GuestTask {
4342    fn already_lowered_parameters(&self) -> bool {
4343        // We reset `lower_params` after we lower the parameters
4344        self.lower_params.is_none()
4345    }
4346
4347    fn returned_or_cancelled(&self) -> bool {
4348        // We reset `lift_result` after we return or exit
4349        self.lift_result.is_none()
4350    }
4351
4352    fn ready_to_delete(&self) -> bool {
4353        let threads_completed = self.threads.is_empty();
4354        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4355        let pending_completion_event = matches!(
4356            self.common.event,
4357            Some(Event::Subtask {
4358                status: Status::Returned | Status::ReturnCancelled
4359            })
4360        );
4361        let ready = threads_completed
4362            && !has_sync_result
4363            && !pending_completion_event
4364            && !matches!(self.host_future_state, HostFutureState::Live);
4365        log::trace!(
4366            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4367            threads_completed,
4368            has_sync_result,
4369            pending_completion_event,
4370            self.host_future_state
4371        );
4372        ready
4373    }
4374
4375    fn new(
4376        state: &mut ConcurrentState,
4377        lower_params: RawLower,
4378        lift_result: LiftResult,
4379        caller: Caller,
4380        callback: Option<CallbackFn>,
4381        component_instance: Instance,
4382        instance: RuntimeComponentInstanceIndex,
4383        async_function: bool,
4384    ) -> Result<Self> {
4385        let sync_call_set = state.push(WaitableSet::default())?;
4386        let host_future_state = match &caller {
4387            Caller::Guest { .. } => HostFutureState::NotApplicable,
4388            Caller::Host {
4389                host_future_present,
4390                ..
4391            } => {
4392                if *host_future_present {
4393                    HostFutureState::Live
4394                } else {
4395                    HostFutureState::NotApplicable
4396                }
4397            }
4398        };
4399        Ok(Self {
4400            common: WaitableCommon::default(),
4401            lower_params: Some(lower_params),
4402            lift_result: Some(lift_result),
4403            result: None,
4404            callback,
4405            caller,
4406            call_context: Some(CallContext::default()),
4407            sync_result: SyncResult::NotProduced,
4408            cancel_sent: false,
4409            starting_sent: false,
4410            subtasks: HashSet::new(),
4411            sync_call_set,
4412            instance: RuntimeInstance {
4413                instance: component_instance.id().instance(),
4414                index: instance,
4415            },
4416            event: None,
4417            function_index: None,
4418            exited: false,
4419            threads: HashSet::new(),
4420            host_future_state,
4421            async_function,
4422        })
4423    }
4424
4425    /// Dispose of this guest task, reparenting any pending subtasks to the
4426    /// caller.
4427    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4428        // If there are not-yet-delivered completion events for subtasks in
4429        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4430        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4431            if let Some(Event::Subtask {
4432                status: Status::Returned | Status::ReturnCancelled,
4433            }) = waitable.common(state)?.event
4434            {
4435                waitable.delete_from(state)?;
4436            }
4437        }
4438
4439        assert!(self.threads.is_empty());
4440
4441        state.delete(self.sync_call_set)?;
4442
4443        // Reparent any pending subtasks to the caller.
4444        match &self.caller {
4445            Caller::Guest {
4446                thread,
4447                instance: runtime_instance,
4448            } => {
4449                let task_mut = state.get_mut(thread.task)?;
4450                let present = task_mut.subtasks.remove(&me);
4451                assert!(present);
4452
4453                for subtask in &self.subtasks {
4454                    task_mut.subtasks.insert(*subtask);
4455                }
4456
4457                for subtask in &self.subtasks {
4458                    state.get_mut(*subtask)?.caller = Caller::Guest {
4459                        thread: *thread,
4460                        instance: *runtime_instance,
4461                    };
4462                }
4463            }
4464            Caller::Host { exit_tx, .. } => {
4465                for subtask in &self.subtasks {
4466                    state.get_mut(*subtask)?.caller = Caller::Host {
4467                        tx: None,
4468                        // Clone `exit_tx` to ensure that it is only dropped
4469                        // once all transitive subtasks of the host call have
4470                        // exited:
4471                        exit_tx: exit_tx.clone(),
4472                        host_future_present: false,
4473                        call_post_return_automatically: true,
4474                    };
4475                }
4476            }
4477        }
4478
4479        for subtask in self.subtasks {
4480            if state.get_mut(subtask)?.exited {
4481                Waitable::Guest(subtask).delete_from(state)?;
4482            }
4483        }
4484
4485        Ok(())
4486    }
4487
4488    fn call_post_return_automatically(&self) -> bool {
4489        matches!(
4490            self.caller,
4491            Caller::Guest { .. }
4492                | Caller::Host {
4493                    call_post_return_automatically: true,
4494                    ..
4495                }
4496        )
4497    }
4498}
4499
4500impl TableDebug for GuestTask {
4501    fn type_name() -> &'static str {
4502        "GuestTask"
4503    }
4504}
4505
4506/// Represents state common to all kinds of waitables.
4507#[derive(Default)]
4508struct WaitableCommon {
4509    /// The currently pending event for this waitable, if any.
4510    event: Option<Event>,
4511    /// The set to which this waitable belongs, if any.
4512    set: Option<TableId<WaitableSet>>,
4513    /// The handle with which the guest refers to this waitable, if any.
4514    handle: Option<u32>,
4515}
4516
4517/// Represents a Component Model Async `waitable`.
4518#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4519enum Waitable {
4520    /// A host task
4521    Host(TableId<HostTask>),
4522    /// A guest task
4523    Guest(TableId<GuestTask>),
4524    /// The read or write end of a stream or future
4525    Transmit(TableId<TransmitHandle>),
4526}
4527
4528impl Waitable {
4529    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4530    /// handle.
4531    fn from_instance(
4532        state: Pin<&mut ComponentInstance>,
4533        caller_instance: RuntimeComponentInstanceIndex,
4534        waitable: u32,
4535    ) -> Result<Self> {
4536        use crate::runtime::vm::component::Waitable;
4537
4538        let (waitable, kind) = state.instance_states().0[caller_instance]
4539            .handle_table()
4540            .waitable_rep(waitable)?;
4541
4542        Ok(match kind {
4543            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4544            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4545            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4546        })
4547    }
4548
4549    /// Retrieve the host-visible identifier for this `Waitable`.
4550    fn rep(&self) -> u32 {
4551        match self {
4552            Self::Host(id) => id.rep(),
4553            Self::Guest(id) => id.rep(),
4554            Self::Transmit(id) => id.rep(),
4555        }
4556    }
4557
4558    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4559    /// remove it from any set it may currently belong to (when `set` is
4560    /// `None`).
4561    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4562        log::trace!("waitable {self:?} join set {set:?}",);
4563
4564        let old = mem::replace(&mut self.common(state)?.set, set);
4565
4566        if let Some(old) = old {
4567            match *self {
4568                Waitable::Host(id) => state.remove_child(id, old),
4569                Waitable::Guest(id) => state.remove_child(id, old),
4570                Waitable::Transmit(id) => state.remove_child(id, old),
4571            }?;
4572
4573            state.get_mut(old)?.ready.remove(self);
4574        }
4575
4576        if let Some(set) = set {
4577            match *self {
4578                Waitable::Host(id) => state.add_child(id, set),
4579                Waitable::Guest(id) => state.add_child(id, set),
4580                Waitable::Transmit(id) => state.add_child(id, set),
4581            }?;
4582
4583            if self.common(state)?.event.is_some() {
4584                self.mark_ready(state)?;
4585            }
4586        }
4587
4588        Ok(())
4589    }
4590
4591    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4592    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4593        Ok(match self {
4594            Self::Host(id) => &mut state.get_mut(*id)?.common,
4595            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4596            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4597        })
4598    }
4599
4600    /// Set or clear the pending event for this waitable and either deliver it
4601    /// to the first waiter, if any, or mark it as ready to be delivered to the
4602    /// next waiter that arrives.
4603    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4604        log::trace!("set event for {self:?}: {event:?}");
4605        self.common(state)?.event = event;
4606        self.mark_ready(state)
4607    }
4608
4609    /// Take the pending event from this waitable, leaving `None` in its place.
4610    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4611        let common = self.common(state)?;
4612        let event = common.event.take();
4613        if let Some(set) = self.common(state)?.set {
4614            state.get_mut(set)?.ready.remove(self);
4615        }
4616
4617        Ok(event)
4618    }
4619
4620    /// Deliver the current event for this waitable to the first waiter, if any,
4621    /// or else mark it as ready to be delivered to the next waiter that
4622    /// arrives.
4623    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4624        if let Some(set) = self.common(state)?.set {
4625            state.get_mut(set)?.ready.insert(*self);
4626            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4627                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4628                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4629
4630                let item = match mode {
4631                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4632                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4633                        thread,
4634                        kind: GuestCallKind::DeliverEvent {
4635                            instance,
4636                            set: Some(set),
4637                        },
4638                    }),
4639                };
4640                state.push_high_priority(item);
4641            }
4642        }
4643        Ok(())
4644    }
4645
4646    /// Remove this waitable from the instance's rep table.
4647    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4648        match self {
4649            Self::Host(task) => {
4650                log::trace!("delete host task {task:?}");
4651                state.delete(*task)?;
4652            }
4653            Self::Guest(task) => {
4654                log::trace!("delete guest task {task:?}");
4655                state.delete(*task)?.dispose(state, *task)?;
4656            }
4657            Self::Transmit(task) => {
4658                state.delete(*task)?;
4659            }
4660        }
4661
4662        Ok(())
4663    }
4664}
4665
4666impl fmt::Debug for Waitable {
4667    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4668        match self {
4669            Self::Host(id) => write!(f, "{id:?}"),
4670            Self::Guest(id) => write!(f, "{id:?}"),
4671            Self::Transmit(id) => write!(f, "{id:?}"),
4672        }
4673    }
4674}
4675
4676/// Represents a Component Model Async `waitable-set`.
4677#[derive(Default)]
4678struct WaitableSet {
4679    /// Which waitables in this set have pending events, if any.
4680    ready: BTreeSet<Waitable>,
4681    /// Which guest threads are currently waiting on this set, if any.
4682    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4683}
4684
4685impl TableDebug for WaitableSet {
4686    fn type_name() -> &'static str {
4687        "WaitableSet"
4688    }
4689}
4690
4691/// Type-erased closure to lower the parameters for a guest task.
4692type RawLower =
4693    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4694
4695/// Type-erased closure to lift the result for a guest task.
4696type RawLift = Box<
4697    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4698>;
4699
4700/// Type erased result of a guest task which may be downcast to the expected
4701/// type by a host caller (or simply ignored in the case of a guest caller; see
4702/// `DummyResult`).
4703type LiftedResult = Box<dyn Any + Send + Sync>;
4704
4705/// Used to return a result from a `LiftFn` when the actual result has already
4706/// been lowered to a guest task's stack and linear memory.
4707struct DummyResult;
4708
4709/// Represents the Component Model Async state of a (sub-)component instance.
4710#[derive(Default)]
4711pub struct ConcurrentInstanceState {
4712    /// Whether backpressure is set for this instance (enabled if >0)
4713    backpressure: u16,
4714    /// Whether this instance can be entered
4715    do_not_enter: bool,
4716    /// Pending calls for this instance which require `Self::backpressure` to be
4717    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4718    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4719}
4720
4721impl ConcurrentInstanceState {
4722    pub fn pending_is_empty(&self) -> bool {
4723        self.pending.is_empty()
4724    }
4725}
4726
4727/// Represents the Component Model Async state of a store.
4728pub struct ConcurrentState {
4729    /// The currently running guest thread, if any.
4730    guest_thread: Option<QualifiedThreadId>,
4731
4732    /// The set of pending host and background tasks, if any.
4733    ///
4734    /// See `ComponentInstance::poll_until` for where we temporarily take this
4735    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4736    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4737    /// The table of waitables, waitable sets, etc.
4738    table: AlwaysMut<ResourceTable>,
4739    /// The "high priority" work queue for this store's event loop.
4740    high_priority: Vec<WorkItem>,
4741    /// The "low priority" work queue for this store's event loop.
4742    low_priority: Vec<WorkItem>,
4743    /// A place to stash the reason a fiber is suspending so that the code which
4744    /// resumed it will know under what conditions the fiber should be resumed
4745    /// again.
4746    suspend_reason: Option<SuspendReason>,
4747    /// A cached fiber which is waiting for work to do.
4748    ///
4749    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4750    worker: Option<StoreFiber<'static>>,
4751    /// A place to stash the work item for which we're resuming a worker fiber.
4752    worker_item: Option<WorkerItem>,
4753
4754    /// Reference counts for all component error contexts
4755    ///
4756    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4757    /// (sub)component ref counts as tracked by `error_context_tables`, for
4758    /// example when the host holds one or more references to error contexts.
4759    ///
4760    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4761    /// component-wide representation) of the index into concurrent state for a given
4762    /// stored `ErrorContext`.
4763    ///
4764    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4765    /// as a `TableId<ErrorContextState>`.
4766    global_error_context_ref_counts:
4767        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4768}
4769
4770impl Default for ConcurrentState {
4771    fn default() -> Self {
4772        Self {
4773            guest_thread: None,
4774            table: AlwaysMut::new(ResourceTable::new()),
4775            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4776            high_priority: Vec::new(),
4777            low_priority: Vec::new(),
4778            suspend_reason: None,
4779            worker: None,
4780            worker_item: None,
4781            global_error_context_ref_counts: BTreeMap::new(),
4782        }
4783    }
4784}
4785
4786impl ConcurrentState {
4787    /// Take ownership of any fibers and futures owned by this object.
4788    ///
4789    /// This should be used when disposing of the `Store` containing this object
4790    /// in order to gracefully resolve any and all fibers using
4791    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4792    /// use-after-free bugs due to fibers which may still have access to the
4793    /// `Store`.
4794    ///
4795    /// Additionally, the futures collected with this function should be dropped
4796    /// within a `tls::set` call, which will ensure than any futures closing
4797    /// over an `&Accessor` will have access to the store when dropped, allowing
4798    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4799    /// panicking.
4800    ///
4801    /// Note that this will leave the object in an inconsistent and unusable
4802    /// state, so it should only be used just prior to dropping it.
4803    pub(crate) fn take_fibers_and_futures(
4804        &mut self,
4805        fibers: &mut Vec<StoreFiber<'static>>,
4806        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4807    ) {
4808        for entry in self.table.get_mut().iter_mut() {
4809            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4810                for mode in mem::take(&mut set.waiting).into_values() {
4811                    if let WaitMode::Fiber(fiber) = mode {
4812                        fibers.push(fiber);
4813                    }
4814                }
4815            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4816                if let GuestThreadState::Suspended(fiber) =
4817                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4818                {
4819                    fibers.push(fiber);
4820                }
4821            }
4822        }
4823
4824        if let Some(fiber) = self.worker.take() {
4825            fibers.push(fiber);
4826        }
4827
4828        let mut take_items = |list| {
4829            for item in mem::take(list) {
4830                match item {
4831                    WorkItem::ResumeFiber(fiber) => {
4832                        fibers.push(fiber);
4833                    }
4834                    WorkItem::PushFuture(future) => {
4835                        self.futures
4836                            .get_mut()
4837                            .as_mut()
4838                            .unwrap()
4839                            .push(future.into_inner());
4840                    }
4841                    _ => {}
4842                }
4843            }
4844        };
4845
4846        take_items(&mut self.high_priority);
4847        take_items(&mut self.low_priority);
4848
4849        if let Some(them) = self.futures.get_mut().take() {
4850            futures.push(them);
4851        }
4852    }
4853
4854    fn push<V: Send + Sync + 'static>(
4855        &mut self,
4856        value: V,
4857    ) -> Result<TableId<V>, ResourceTableError> {
4858        self.table.get_mut().push(value).map(TableId::from)
4859    }
4860
4861    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4862        self.table.get_mut().get_mut(&Resource::from(id))
4863    }
4864
4865    pub fn add_child<T: 'static, U: 'static>(
4866        &mut self,
4867        child: TableId<T>,
4868        parent: TableId<U>,
4869    ) -> Result<(), ResourceTableError> {
4870        self.table
4871            .get_mut()
4872            .add_child(Resource::from(child), Resource::from(parent))
4873    }
4874
4875    pub fn remove_child<T: 'static, U: 'static>(
4876        &mut self,
4877        child: TableId<T>,
4878        parent: TableId<U>,
4879    ) -> Result<(), ResourceTableError> {
4880        self.table
4881            .get_mut()
4882            .remove_child(Resource::from(child), Resource::from(parent))
4883    }
4884
4885    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4886        self.table.get_mut().delete(Resource::from(id))
4887    }
4888
4889    fn push_future(&mut self, future: HostTaskFuture) {
4890        // Note that we can't directly push to `ConcurrentState::futures` here
4891        // since this may be called from a future that's being polled inside
4892        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4893        // so it has exclusive access while polling it.  Therefore, we push a
4894        // work item to the "high priority" queue, which will actually push to
4895        // `ConcurrentState::futures` later.
4896        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4897    }
4898
4899    fn push_high_priority(&mut self, item: WorkItem) {
4900        log::trace!("push high priority: {item:?}");
4901        self.high_priority.push(item);
4902    }
4903
4904    fn push_low_priority(&mut self, item: WorkItem) {
4905        log::trace!("push low priority: {item:?}");
4906        self.low_priority.push(item);
4907    }
4908
4909    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4910        if high_priority {
4911            self.push_high_priority(item);
4912        } else {
4913            self.push_low_priority(item);
4914        }
4915    }
4916
4917    /// Determine whether the instance associated with the specified guest task
4918    /// may be entered (i.e. is not already on the async call stack).
4919    ///
4920    /// This is an additional check on top of the "may_enter" instance flag;
4921    /// it's needed because async-lifted exports with callback functions must
4922    /// not call their own instances directly or indirectly, and due to the
4923    /// "stackless" nature of callback-enabled guest tasks this may happen even
4924    /// if there are no activation records on the stack (i.e. the "may_enter"
4925    /// field is `true`) for that instance.
4926    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4927        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4928
4929        // Walk the task tree back to the root, looking for potential
4930        // reentrance.
4931        //
4932        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4933        // such that each bit represents and instance which has been entered by
4934        // that task or an ancestor of that task, in which case this would be a
4935        // constant time check.
4936        loop {
4937            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4938                Caller::Host { .. } => break true,
4939                Caller::Guest { thread, instance } => {
4940                    if *instance == guest_instance.index {
4941                        break false;
4942                    } else {
4943                        *thread
4944                    }
4945                }
4946            };
4947            guest_task = next_thread.task;
4948        }
4949    }
4950
4951    /// Implements the `context.get` intrinsic.
4952    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4953        let thread = self.guest_thread.unwrap();
4954        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4955        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4956        Ok(val)
4957    }
4958
4959    /// Implements the `context.set` intrinsic.
4960    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4961        let thread = self.guest_thread.unwrap();
4962        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4963        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4964        Ok(())
4965    }
4966
4967    /// Returns whether there's a pending cancellation on the current guest thread,
4968    /// consuming the event if so.
4969    fn take_pending_cancellation(&mut self) -> bool {
4970        let thread = self.guest_thread.unwrap();
4971        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4972            assert!(matches!(event, Event::Cancelled));
4973            true
4974        } else {
4975            false
4976        }
4977    }
4978
4979    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4980        if self.may_block(task) {
4981            Ok(())
4982        } else {
4983            Err(Trap::CannotBlockSyncTask.into())
4984        }
4985    }
4986
4987    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4988        let task = self.get_mut(task).unwrap();
4989        task.async_function || task.returned_or_cancelled()
4990    }
4991}
4992
4993/// Provide a type hint to compiler about the shape of a parameter lower
4994/// closure.
4995fn for_any_lower<
4996    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4997>(
4998    fun: F,
4999) -> F {
5000    fun
5001}
5002
5003/// Provide a type hint to compiler about the shape of a result lift closure.
5004fn for_any_lift<
5005    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5006>(
5007    fun: F,
5008) -> F {
5009    fun
5010}
5011
5012/// Wrap the specified future in a `poll_fn` which asserts that the future is
5013/// only polled from the event loop of the specified `Store`.
5014///
5015/// See `StoreContextMut::run_concurrent` for details.
5016fn checked<F: Future + Send + 'static>(
5017    id: StoreId,
5018    fut: F,
5019) -> impl Future<Output = F::Output> + Send + 'static {
5020    async move {
5021        let mut fut = pin!(fut);
5022        future::poll_fn(move |cx| {
5023            let message = "\
5024                `Future`s which depend on asynchronous component tasks, streams, or \
5025                futures to complete may only be polled from the event loop of the \
5026                store to which they belong.  Please use \
5027                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5028            ";
5029            tls::try_get(|store| {
5030                let matched = match store {
5031                    tls::TryGet::Some(store) => store.id() == id,
5032                    tls::TryGet::Taken | tls::TryGet::None => false,
5033                };
5034
5035                if !matched {
5036                    panic!("{message}")
5037                }
5038            });
5039            fut.as_mut().poll(cx)
5040        })
5041        .await
5042    }
5043}
5044
5045/// Assert that `StoreContextMut::run_concurrent` has not been called from
5046/// within an store's event loop.
5047fn check_recursive_run() {
5048    tls::try_get(|store| {
5049        if !matches!(store, tls::TryGet::None) {
5050            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5051        }
5052    });
5053}
5054
5055fn unpack_callback_code(code: u32) -> (u32, u32) {
5056    (code & 0xF, code >> 4)
5057}
5058
5059/// Helper struct for packaging parameters to be passed to
5060/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5061/// `waitable-set.poll`.
5062struct WaitableCheckParams {
5063    set: TableId<WaitableSet>,
5064    options: OptionsIndex,
5065    payload: u32,
5066}
5067
5068/// Indicates whether `ComponentInstance::waitable_check` is being called for
5069/// `waitable-set.wait` or `waitable-set.poll`.
5070enum WaitableCheck {
5071    Wait,
5072    Poll,
5073}
5074
5075/// Represents a guest task called from the host, prepared using `prepare_call`.
5076pub(crate) struct PreparedCall<R> {
5077    /// The guest export to be called
5078    handle: Func,
5079    /// The guest thread created by `prepare_call`
5080    thread: QualifiedThreadId,
5081    /// The number of lowered core Wasm parameters to pass to the call.
5082    param_count: usize,
5083    /// The `oneshot::Receiver` to which the result of the call will be
5084    /// delivered when it is available.
5085    rx: oneshot::Receiver<LiftedResult>,
5086    /// The `oneshot::Receiver` which will resolve when the task -- and any
5087    /// transitive subtasks -- have all exited.
5088    exit_rx: oneshot::Receiver<()>,
5089    _phantom: PhantomData<R>,
5090}
5091
5092impl<R> PreparedCall<R> {
5093    /// Get a copy of the `TaskId` for this `PreparedCall`.
5094    pub(crate) fn task_id(&self) -> TaskId {
5095        TaskId {
5096            task: self.thread.task,
5097        }
5098    }
5099}
5100
5101/// Represents a task created by `prepare_call`.
5102pub(crate) struct TaskId {
5103    task: TableId<GuestTask>,
5104}
5105
5106impl TaskId {
5107    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5108    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5109    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5110    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5111    /// and delete the task when all threads are done.
5112    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5113        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5114        if !task.already_lowered_parameters() {
5115            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5116        } else {
5117            task.host_future_state = HostFutureState::Dropped;
5118            if task.ready_to_delete() {
5119                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5120            }
5121        }
5122        Ok(())
5123    }
5124}
5125
5126/// Prepare a call to the specified exported Wasm function, providing functions
5127/// for lowering the parameters and lifting the result.
5128///
5129/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5130/// loop, use `queue_call`.
5131pub(crate) fn prepare_call<T, R>(
5132    mut store: StoreContextMut<T>,
5133    handle: Func,
5134    param_count: usize,
5135    host_future_present: bool,
5136    call_post_return_automatically: bool,
5137    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5138    + Send
5139    + Sync
5140    + 'static,
5141    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5142    + Send
5143    + Sync
5144    + 'static,
5145) -> Result<PreparedCall<R>> {
5146    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5147
5148    let instance = handle.instance().id().get(store.0);
5149    let options = &instance.component().env_component().options[options];
5150    let ty = &instance.component().types()[ty];
5151    let async_function = ty.async_;
5152    let task_return_type = ty.results;
5153    let component_instance = raw_options.instance;
5154    let callback = options.callback.map(|i| instance.runtime_callback(i));
5155    let memory = options
5156        .memory()
5157        .map(|i| instance.runtime_memory(i))
5158        .map(SendSyncPtr::new);
5159    let string_encoding = options.string_encoding;
5160    let token = StoreToken::new(store.as_context_mut());
5161    let state = store.0.concurrent_state_mut();
5162
5163    let (tx, rx) = oneshot::channel();
5164    let (exit_tx, exit_rx) = oneshot::channel();
5165
5166    let mut task = GuestTask::new(
5167        state,
5168        Box::new(for_any_lower(move |store, params| {
5169            lower_params(handle, token.as_context_mut(store), params)
5170        })),
5171        LiftResult {
5172            lift: Box::new(for_any_lift(move |store, result| {
5173                lift_result(handle, store, result)
5174            })),
5175            ty: task_return_type,
5176            memory,
5177            string_encoding,
5178        },
5179        Caller::Host {
5180            tx: Some(tx),
5181            exit_tx: Arc::new(exit_tx),
5182            host_future_present,
5183            call_post_return_automatically,
5184        },
5185        callback.map(|callback| {
5186            let callback = SendSyncPtr::new(callback);
5187            let instance = handle.instance();
5188            Box::new(
5189                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5190                    let store = token.as_context_mut(store);
5191                    // SAFETY: Per the contract of `prepare_call`, the callback
5192                    // will remain valid at least as long is this task exists.
5193                    unsafe {
5194                        instance.call_callback(
5195                            store,
5196                            runtime_instance,
5197                            callback,
5198                            event,
5199                            handle,
5200                            call_post_return_automatically,
5201                        )
5202                    }
5203                },
5204            ) as CallbackFn
5205        }),
5206        handle.instance(),
5207        component_instance,
5208        async_function,
5209    )?;
5210    task.function_index = Some(handle.index());
5211
5212    let task = state.push(task)?;
5213    let thread = state.push(GuestThread::new_implicit(task))?;
5214    state.get_mut(task)?.threads.insert(thread);
5215
5216    Ok(PreparedCall {
5217        handle,
5218        thread: QualifiedThreadId { task, thread },
5219        param_count,
5220        rx,
5221        exit_rx,
5222        _phantom: PhantomData,
5223    })
5224}
5225
5226/// Queue a call previously prepared using `prepare_call` to be run as part of
5227/// the associated `ComponentInstance`'s event loop.
5228///
5229/// The returned future will resolve to the result once it is available, but
5230/// must only be polled via the instance's event loop. See
5231/// `StoreContextMut::run_concurrent` for details.
5232pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5233    mut store: StoreContextMut<T>,
5234    prepared: PreparedCall<R>,
5235) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5236    let PreparedCall {
5237        handle,
5238        thread,
5239        param_count,
5240        rx,
5241        exit_rx,
5242        ..
5243    } = prepared;
5244
5245    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5246
5247    Ok(checked(
5248        store.0.id(),
5249        rx.map(move |result| {
5250            result
5251                .map(|v| (*v.downcast().unwrap(), exit_rx))
5252                .map_err(crate::Error::from)
5253        }),
5254    ))
5255}
5256
5257/// Queue a call previously prepared using `prepare_call` to be run as part of
5258/// the associated `ComponentInstance`'s event loop.
5259fn queue_call0<T: 'static>(
5260    store: StoreContextMut<T>,
5261    handle: Func,
5262    guest_thread: QualifiedThreadId,
5263    param_count: usize,
5264) -> Result<()> {
5265    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5266    let is_concurrent = raw_options.async_;
5267    let callback = raw_options.callback;
5268    let instance = handle.instance();
5269    let callee = handle.lifted_core_func(store.0);
5270    let post_return = handle.post_return_core_func(store.0);
5271    let callback = callback.map(|i| {
5272        let instance = instance.id().get(store.0);
5273        SendSyncPtr::new(instance.runtime_callback(i))
5274    });
5275
5276    log::trace!("queueing call {guest_thread:?}");
5277
5278    let instance_flags = if callback.is_none() {
5279        None
5280    } else {
5281        Some(flags)
5282    };
5283
5284    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5285    // (with signatures appropriate for this call) and will remain valid as
5286    // long as this instance is valid.
5287    unsafe {
5288        instance.queue_call(
5289            store,
5290            guest_thread,
5291            SendSyncPtr::new(callee),
5292            param_count,
5293            1,
5294            instance_flags,
5295            is_concurrent,
5296            callback,
5297            post_return.map(SendSyncPtr::new),
5298        )
5299    }
5300}