wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108/// Constant defined in the Component Model spec to indicate that the async
109/// intrinsic (e.g. `future.write`) has not yet completed.
110const BLOCKED: u32 = 0xffff_ffff;
111
112/// Corresponds to `CallState` in the upstream spec.
113#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115    Starting = 0,
116    Started = 1,
117    Returned = 2,
118    StartCancelled = 3,
119    ReturnCancelled = 4,
120}
121
122impl Status {
123    /// Packs this status and the optional `waitable` provided into a 32-bit
124    /// result that the canonical ABI requires.
125    ///
126    /// The low 4 bits are reserved for the status while the upper 28 bits are
127    /// the waitable, if present.
128    pub fn pack(self, waitable: Option<u32>) -> u32 {
129        assert!(matches!(self, Status::Returned) == waitable.is_none());
130        let waitable = waitable.unwrap_or(0);
131        assert!(waitable < (1 << 28));
132        (waitable << 4) | (self as u32)
133    }
134}
135
136/// Corresponds to `EventCode` in the Component Model spec, plus related payload
137/// data.
138#[derive(Clone, Copy, Debug)]
139enum Event {
140    None,
141    Cancelled,
142    Subtask {
143        status: Status,
144    },
145    StreamRead {
146        code: ReturnCode,
147        pending: Option<(TypeStreamTableIndex, u32)>,
148    },
149    StreamWrite {
150        code: ReturnCode,
151        pending: Option<(TypeStreamTableIndex, u32)>,
152    },
153    FutureRead {
154        code: ReturnCode,
155        pending: Option<(TypeFutureTableIndex, u32)>,
156    },
157    FutureWrite {
158        code: ReturnCode,
159        pending: Option<(TypeFutureTableIndex, u32)>,
160    },
161}
162
163impl Event {
164    /// Lower this event to core Wasm integers for delivery to the guest.
165    ///
166    /// Note that the waitable handle, if any, is assumed to be lowered
167    /// separately.
168    fn parts(self) -> (u32, u32) {
169        const EVENT_NONE: u32 = 0;
170        const EVENT_SUBTASK: u32 = 1;
171        const EVENT_STREAM_READ: u32 = 2;
172        const EVENT_STREAM_WRITE: u32 = 3;
173        const EVENT_FUTURE_READ: u32 = 4;
174        const EVENT_FUTURE_WRITE: u32 = 5;
175        const EVENT_CANCELLED: u32 = 6;
176        match self {
177            Event::None => (EVENT_NONE, 0),
178            Event::Cancelled => (EVENT_CANCELLED, 0),
179            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184        }
185    }
186}
187
188/// Corresponds to `CallbackCode` in the spec.
189mod callback_code {
190    pub const EXIT: u32 = 0;
191    pub const YIELD: u32 = 1;
192    pub const WAIT: u32 = 2;
193    pub const POLL: u32 = 3;
194}
195
196/// A flag indicating that the callee is an async-lowered export.
197///
198/// This may be passed to the `async-start` intrinsic from a fused adapter.
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201/// Provides access to either store data (via the `get` method) or the store
202/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
203/// instance to which the current host task belongs.
204///
205/// See [`Accessor::with`] for details.
206pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207    store: StoreContextMut<'a, T>,
208    get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213    D: HasData + ?Sized,
214    T: 'static,
215{
216    /// Creates a new [`Access`] from its component parts.
217    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218        Self { store, get_data }
219    }
220
221    /// Get mutable access to the store data.
222    pub fn data_mut(&mut self) -> &mut T {
223        self.store.data_mut()
224    }
225
226    /// Get mutable access to the store data.
227    pub fn get(&mut self) -> D::Data<'_> {
228        (self.get_data)(self.data_mut())
229    }
230
231    /// Spawn a background task.
232    ///
233    /// See [`Accessor::spawn`] for details.
234    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235    where
236        T: 'static,
237    {
238        let accessor = Accessor {
239            get_data: self.get_data,
240            token: StoreToken::new(self.store.as_context_mut()),
241        };
242        self.store
243            .as_context_mut()
244            .spawn_with_accessor(accessor, task)
245    }
246
247    /// Returns the getter this accessor is using to project from `T` into
248    /// `D::Data`.
249    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
250        self.get_data
251    }
252}
253
254impl<'a, T, D> AsContext for Access<'a, T, D>
255where
256    D: HasData + ?Sized,
257    T: 'static,
258{
259    type Data = T;
260
261    fn as_context(&self) -> StoreContext<'_, T> {
262        self.store.as_context()
263    }
264}
265
266impl<'a, T, D> AsContextMut for Access<'a, T, D>
267where
268    D: HasData + ?Sized,
269    T: 'static,
270{
271    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
272        self.store.as_context_mut()
273    }
274}
275
276/// Provides scoped mutable access to store data in the context of a concurrent
277/// host task future.
278///
279/// This allows multiple host task futures to execute concurrently and access
280/// the store between (but not across) `await` points.
281///
282/// # Rationale
283///
284/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
285/// `D::Data<'_>`. The problem this is solving, however, is that it does not
286/// literally store these values. The basic problem is that when a concurrent
287/// host future is being polled it has access to `&mut T` (and the whole
288/// `Store`) but when it's not being polled it does not have access to these
289/// values. This reflects how the store is only ever polling one future at a
290/// time so the store is effectively being passed between futures.
291///
292/// Rust's `Future` trait, however, has no means of passing a `Store`
293/// temporarily between futures. The [`Context`](std::task::Context) type does
294/// not have the ability to attach arbitrary information to it at this time.
295/// This type, [`Accessor`], is used to bridge this expressivity gap.
296///
297/// The [`Accessor`] type here represents the ability to acquire, temporarily in
298/// a synchronous manner, the current store. The [`Accessor::with`] function
299/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
300/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
301/// not take an `async` closure as its argument, instead it's a synchronous
302/// closure which must complete during on run of `Future::poll`. This reflects
303/// how the store is temporarily made available while a host future is being
304/// polled.
305///
306/// # Implementation
307///
308/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
309/// this type additionally doesn't even have a lifetime parameter. This is
310/// instead a representation of proof of the ability to acquire these while a
311/// future is being polled. Wasmtime will, when it polls a host future,
312/// configure ambient state such that the `Accessor` that a future closes over
313/// will work and be able to access the store.
314///
315/// This has a number of implications for users such as:
316///
317/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
318///   the lifetime of a single future.
319/// * A future is expected to, however, close over an `Accessor` and keep it
320///   alive probably for the duration of the entire future.
321/// * Different host futures will be given different `Accessor`s, and that's
322///   intentional.
323/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
324///   alleviates some otherwise required bounds to be written down.
325///
326/// # Using `Accessor` in `Drop`
327///
328/// The methods on `Accessor` are only expected to work in the context of
329/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
330/// host future can be dropped at any time throughout the system and Wasmtime
331/// store context is not necessarily available at that time. It's recommended to
332/// not use `Accessor` methods in anything connected to a `Drop` implementation
333/// as they will panic and have unintended results. If you run into this though
334/// feel free to file an issue on the Wasmtime repository.
335pub struct Accessor<T: 'static, D = HasSelf<T>>
336where
337    D: HasData + ?Sized,
338{
339    token: StoreToken<T>,
340    get_data: fn(&mut T) -> D::Data<'_>,
341}
342
343/// A helper trait to take any type of accessor-with-data in functions.
344///
345/// This trait is similar to [`AsContextMut`] except that it's used when
346/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
347/// [`Accessor`] is the main type used in concurrent settings and is passed to
348/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
349///
350/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
351/// this trait. This effectively means that regardless of the `D` in
352/// `Accessor<T, D>` it can still be passed to a function which just needs a
353/// store accessor.
354///
355/// Acquiring an [`Accessor`] can be done through
356/// [`StoreContextMut::run_concurrent`] for example or in a host function
357/// through
358/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
359pub trait AsAccessor {
360    /// The `T` in `Store<T>` that this accessor refers to.
361    type Data: 'static;
362
363    /// The `D` in `Accessor<T, D>`, or the projection out of
364    /// `Self::Data`.
365    type AccessorData: HasData + ?Sized;
366
367    /// Returns the accessor that this is referring to.
368    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
369}
370
371impl<T: AsAccessor + ?Sized> AsAccessor for &T {
372    type Data = T::Data;
373    type AccessorData = T::AccessorData;
374
375    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
376        T::as_accessor(self)
377    }
378}
379
380impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
381    type Data = T;
382    type AccessorData = D;
383
384    fn as_accessor(&self) -> &Accessor<T, D> {
385        self
386    }
387}
388
389// Note that it is intentional at this time that `Accessor` does not actually
390// store `&mut T` or anything similar. This distinctly enables the `Accessor`
391// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
392// that matter). This is used to ergonomically simplify bindings where the
393// majority of the time `Accessor` is closed over in a future which then needs
394// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
395// you already have to write `T: 'static`...) it helps to avoid this.
396//
397// Note as well that `Accessor` doesn't actually store its data at all. Instead
398// it's more of a "proof" of what can be accessed from TLS. API design around
399// `Accessor` and functions like `Linker::func_wrap_concurrent` are
400// intentionally made to ensure that `Accessor` is ideally only used in the
401// context that TLS variables are actually set. For example host functions are
402// given `&Accessor`, not `Accessor`, and this prevents them from persisting
403// the value outside of a future. Within the future the TLS variables are all
404// guaranteed to be set while the future is being polled.
405//
406// Finally though this is not an ironclad guarantee, but nor does it need to be.
407// The TLS APIs are designed to panic or otherwise model usage where they're
408// called recursively or similar. It's hoped that code cannot be constructed to
409// actually hit this at runtime but this is not a safety requirement at this
410// time.
411const _: () = {
412    const fn assert<T: Send + Sync>() {}
413    assert::<Accessor<UnsafeCell<u32>>>();
414};
415
416impl<T> Accessor<T> {
417    /// Creates a new `Accessor` backed by the specified functions.
418    ///
419    /// - `get`: used to retrieve the store
420    ///
421    /// - `get_data`: used to "project" from the store's associated data to
422    /// another type (e.g. a field of that data or a wrapper around it).
423    ///
424    /// - `spawn`: used to queue spawned background tasks to be run later
425    pub(crate) fn new(token: StoreToken<T>) -> Self {
426        Self {
427            token,
428            get_data: |x| x,
429        }
430    }
431}
432
433impl<T, D> Accessor<T, D>
434where
435    D: HasData + ?Sized,
436{
437    /// Run the specified closure, passing it mutable access to the store.
438    ///
439    /// This function is one of the main building blocks of the [`Accessor`]
440    /// type. This yields synchronous, blocking, access to store via an
441    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
442    /// providing the ability to access `D` via [`Access::get`]. Note that the
443    /// `fun` here is given only temporary access to the store and `T`/`D`
444    /// meaning that the return value `R` here is not allowed to capture borrows
445    /// into the two. If access is needed to data within `T` or `D` outside of
446    /// this closure then it must be `clone`d out, for example.
447    ///
448    /// # Panics
449    ///
450    /// This function will panic if it is call recursively with any other
451    /// accessor already in scope. For example if `with` is called within `fun`,
452    /// then this function will panic. It is up to the embedder to ensure that
453    /// this does not happen.
454    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
455        tls::get(|vmstore| {
456            fun(Access {
457                store: self.token.as_context_mut(vmstore),
458                get_data: self.get_data,
459            })
460        })
461    }
462
463    /// Returns the getter this accessor is using to project from `T` into
464    /// `D::Data`.
465    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
466        self.get_data
467    }
468
469    /// Changes this accessor to access `D2` instead of the current type
470    /// parameter `D`.
471    ///
472    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
473    ///
474    /// # Panics
475    ///
476    /// When using this API the returned value is disconnected from `&self` and
477    /// the lifetime binding the `self` argument. An `Accessor` only works
478    /// within the context of the closure or async closure that it was
479    /// originally given to, however. This means that due to the fact that the
480    /// returned value has no lifetime connection it's possible to use the
481    /// accessor outside of `&self`, the original accessor, and panic.
482    ///
483    /// The returned value should only be used within the scope of the original
484    /// `Accessor` that `self` refers to.
485    pub fn with_getter<D2: HasData>(
486        &self,
487        get_data: fn(&mut T) -> D2::Data<'_>,
488    ) -> Accessor<T, D2> {
489        Accessor {
490            token: self.token,
491            get_data,
492        }
493    }
494
495    /// Spawn a background task which will receive an `&Accessor<T, D>` and
496    /// run concurrently with any other tasks in progress for the current
497    /// store.
498    ///
499    /// This is particularly useful for host functions which return a `stream`
500    /// or `future` such that the code to write to the write end of that
501    /// `stream` or `future` must run after the function returns.
502    ///
503    /// The returned [`JoinHandle`] may be used to cancel the task.
504    ///
505    /// # Panics
506    ///
507    /// Panics if called within a closure provided to the [`Accessor::with`]
508    /// function. This can only be called outside an active invocation of
509    /// [`Accessor::with`].
510    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
511    where
512        T: 'static,
513    {
514        let accessor = self.clone_for_spawn();
515        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
516    }
517
518    fn clone_for_spawn(&self) -> Self {
519        Self {
520            token: self.token,
521            get_data: self.get_data,
522        }
523    }
524}
525
526/// Represents a task which may be provided to `Accessor::spawn`,
527/// `Accessor::forward`, or `StorecContextMut::spawn`.
528// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
529// option.
530//
531// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
532// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
533// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
534// Send + Sync + 'static` fails with a type mismatch error when we try to pass
535// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
536// best we can do for the time being.
537pub trait AccessorTask<T, D, R>: Send + 'static
538where
539    D: HasData + ?Sized,
540{
541    /// Run the task.
542    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
543}
544
545/// Represents parameter and result metadata for the caller side of a
546/// guest->guest call orchestrated by a fused adapter.
547enum CallerInfo {
548    /// Metadata for a call to an async-lowered import
549    Async {
550        params: Vec<ValRaw>,
551        has_result: bool,
552    },
553    /// Metadata for a call to an sync-lowered import
554    Sync {
555        params: Vec<ValRaw>,
556        result_count: u32,
557    },
558}
559
560/// Indicates how a guest task is waiting on a waitable set.
561enum WaitMode {
562    /// The guest task is waiting using `task.wait`
563    Fiber(StoreFiber<'static>),
564    /// The guest task is waiting via a callback declared as part of an
565    /// async-lifted export.
566    Callback(Instance),
567}
568
569/// Represents the reason a fiber is suspending itself.
570#[derive(Debug)]
571enum SuspendReason {
572    /// The fiber is waiting for an event to be delivered to the specified
573    /// waitable set or task.
574    Waiting {
575        set: TableId<WaitableSet>,
576        thread: QualifiedThreadId,
577    },
578    /// The fiber has finished handling its most recent work item and is waiting
579    /// for another (or to be dropped if it is no longer needed).
580    NeedWork,
581    /// The fiber is yielding and should be resumed once other tasks have had a
582    /// chance to run.
583    Yielding { thread: QualifiedThreadId },
584    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
585    ExplicitlySuspending { thread: QualifiedThreadId },
586}
587
588/// Represents a pending call into guest code for a given guest task.
589enum GuestCallKind {
590    /// Indicates there's an event to deliver to the task, possibly related to a
591    /// waitable set the task has been waiting on or polling.
592    DeliverEvent {
593        /// The instance to which the task belongs.
594        instance: Instance,
595        /// The waitable set the event belongs to, if any.
596        ///
597        /// If this is `None` the event will be waiting in the
598        /// `GuestTask::event` field for the task.
599        set: Option<TableId<WaitableSet>>,
600    },
601    /// Indicates that a new guest task call is pending and may be executed
602    /// using the specified closure.
603    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
604    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
605}
606
607impl fmt::Debug for GuestCallKind {
608    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
609        match self {
610            Self::DeliverEvent { instance, set } => f
611                .debug_struct("DeliverEvent")
612                .field("instance", instance)
613                .field("set", set)
614                .finish(),
615            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
616            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
617        }
618    }
619}
620
621/// Represents a pending call into guest code for a given guest thread.
622#[derive(Debug)]
623struct GuestCall {
624    thread: QualifiedThreadId,
625    kind: GuestCallKind,
626}
627
628impl GuestCall {
629    /// Returns whether or not the call is ready to run.
630    ///
631    /// A call will not be ready to run if either:
632    ///
633    /// - the (sub-)component instance to be called has already been entered and
634    /// cannot be reentered until an in-progress call completes
635    ///
636    /// - the call is for a not-yet started task and the (sub-)component
637    /// instance to be called has backpressure enabled
638    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
639        let task_instance = state.get_mut(self.thread.task)?.instance;
640        let state = state.instance_state(task_instance);
641
642        let ready = match &self.kind {
643            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
644            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
645            GuestCallKind::StartExplicit(_) => true,
646        };
647        log::trace!(
648            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
649            state.do_not_enter,
650            state.backpressure
651        );
652        Ok(ready)
653    }
654}
655
656/// Job to be run on a worker fiber.
657enum WorkerItem {
658    GuestCall(GuestCall),
659    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
660}
661
662/// Represents state related to an in-progress poll operation (e.g. `task.poll`
663/// or `CallbackCode.POLL`).
664#[derive(Debug)]
665struct PollParams {
666    /// The instance to which the polling thread belongs.
667    instance: Instance,
668    /// The polling thread.
669    thread: QualifiedThreadId,
670    /// The waitable set being polled.
671    set: TableId<WaitableSet>,
672}
673
674/// Represents a pending work item to be handled by the event loop for a given
675/// component instance.
676enum WorkItem {
677    /// A host task to be pushed to `ConcurrentState::futures`.
678    PushFuture(AlwaysMut<HostTaskFuture>),
679    /// A fiber to resume.
680    ResumeFiber(StoreFiber<'static>),
681    /// A pending call into guest code for a given guest task.
682    GuestCall(GuestCall),
683    /// A pending `task.poll` or `CallbackCode.POLL` operation.
684    Poll(PollParams),
685    /// A job to run on a worker fiber.
686    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691        match self {
692            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
696            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
697        }
698    }
699}
700
701/// Whether a suspension intrinsic was cancelled or completed
702#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
703pub(crate) enum WaitResult {
704    Cancelled,
705    Completed,
706}
707
708/// Poll the specified future until it completes on behalf of a guest->host call
709/// using a sync-lowered import.
710///
711/// This is similar to `Instance::first_poll` except it's for sync-lowered
712/// imports, meaning we don't need to handle cancellation and we can block the
713/// caller until the task completes, at which point the caller can handle
714/// lowering the result to the guest's stack and linear memory.
715pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
716    store: &mut dyn VMStore,
717    future: impl Future<Output = Result<R>> + Send + 'static,
718    caller_instance: RuntimeComponentInstanceIndex,
719) -> Result<R> {
720    let state = store.concurrent_state_mut();
721
722    // If there is no current guest thread set, that means the host function was
723    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
724    // should complete immediately.
725    let Some(caller) = state.guest_thread else {
726        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
727            Poll::Ready(result) => result,
728            Poll::Pending => {
729                unreachable!()
730            }
731        };
732    };
733
734    // Save any existing result stashed in `GuestTask::result` so we can replace
735    // it with the new result.
736    let old_result = state
737        .get_mut(caller.task)
738        .with_context(|| format!("bad handle: {caller:?}"))?
739        .result
740        .take();
741
742    // Add a temporary host task into the table so we can track its progress.
743    // Note that we'll never allocate a waitable handle for the guest since
744    // we're being called synchronously.
745    let task = state.push(HostTask::new(caller_instance, None))?;
746
747    log::trace!("new host task child of {caller:?}: {task:?}");
748
749    // Wrap the future in a closure which will take care of stashing the result
750    // in `GuestTask::result` and resuming this fiber when the host task
751    // completes.
752    let mut future = Box::pin(async move {
753        let result = future.await?;
754        tls::get(move |store| {
755            let state = store.concurrent_state_mut();
756            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
757
758            Waitable::Host(task).set_event(
759                state,
760                Some(Event::Subtask {
761                    status: Status::Returned,
762                }),
763            )?;
764
765            Ok(())
766        })
767    }) as HostTaskFuture;
768
769    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
770    // add the future to `ConcurrentState::futures` and poll it automatically
771    // from the event loop if it doesn't complete immediately here.
772    let poll = tls::set(store, || {
773        future
774            .as_mut()
775            .poll(&mut Context::from_waker(&Waker::noop()))
776    });
777
778    match poll {
779        Poll::Ready(result) => {
780            // It completed immediately; check the result and delete the task.
781            result?;
782            log::trace!("delete host task {task:?} (already ready)");
783            store.concurrent_state_mut().delete(task)?;
784        }
785        Poll::Pending => {
786            // It did not complete immediately; add it to
787            // `ConcurrentState::futures` so it will be polled via the event
788            // loop; then use `GuestTask::sync_call_set` to wait for the task to
789            // complete, suspending the current fiber until it does so.
790            let state = store.concurrent_state_mut();
791            state.push_future(future);
792
793            let set = state.get_mut(caller.task)?.sync_call_set;
794            Waitable::Host(task).join(state, Some(set))?;
795
796            store.suspend(SuspendReason::Waiting {
797                set,
798                thread: caller,
799            })?;
800        }
801    }
802
803    // Retrieve and return the result.
804    Ok(*mem::replace(
805        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
806        old_result,
807    )
808    .unwrap()
809    .downcast()
810    .unwrap())
811}
812
813/// Execute the specified guest call.
814fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
815    match call.kind {
816        GuestCallKind::DeliverEvent { instance, set } => {
817            let (event, waitable) = instance
818                .get_event(store, call.thread.task, set, true)?
819                .unwrap();
820            let state = store.concurrent_state_mut();
821            let task = state.get_mut(call.thread.task)?;
822            let runtime_instance = task.instance;
823            let handle = waitable.map(|(_, v)| v).unwrap_or(0);
824
825            log::trace!(
826                "use callback to deliver event {event:?} to {:?} for {waitable:?}",
827                call.thread,
828            );
829
830            let old_thread = state.guest_thread.replace(call.thread);
831            log::trace!(
832                "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
833                call.thread
834            );
835
836            store.maybe_push_call_context(call.thread.task)?;
837
838            let state = store.concurrent_state_mut();
839            state.enter_instance(runtime_instance);
840
841            let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
842
843            let code = callback(store, runtime_instance, event, handle)?;
844
845            let state = store.concurrent_state_mut();
846
847            state.get_mut(call.thread.task)?.callback = Some(callback);
848            state.exit_instance(runtime_instance)?;
849
850            store.maybe_pop_call_context(call.thread.task)?;
851
852            instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
853
854            store.concurrent_state_mut().guest_thread = old_thread;
855            log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
856        }
857        GuestCallKind::StartImplicit(fun) => {
858            fun(store)?;
859        }
860        GuestCallKind::StartExplicit(fun) => {
861            fun(store)?;
862        }
863    }
864
865    Ok(())
866}
867
868impl<T> Store<T> {
869    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
870    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
871    where
872        T: Send + 'static,
873    {
874        self.as_context_mut().run_concurrent(fun).await
875    }
876
877    #[doc(hidden)]
878    pub fn assert_concurrent_state_empty(&mut self) {
879        self.as_context_mut().assert_concurrent_state_empty();
880    }
881
882    /// Convenience wrapper for [`StoreContextMut::spawn`].
883    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
884    where
885        T: 'static,
886    {
887        self.as_context_mut().spawn(task)
888    }
889}
890
891impl<T> StoreContextMut<'_, T> {
892    /// Assert that all the relevant tables and queues in the concurrent state
893    /// for this store are empty.
894    ///
895    /// This is for sanity checking in integration tests
896    /// (e.g. `component-async-tests`) that the relevant state has been cleared
897    /// after each test concludes.  This should help us catch leaks, e.g. guest
898    /// tasks which haven't been deleted despite having completed and having
899    /// been dropped by their supertasks.
900    #[doc(hidden)]
901    pub fn assert_concurrent_state_empty(self) {
902        let store = self.0;
903        store
904            .store_data_mut()
905            .components
906            .assert_guest_tables_empty();
907        let state = store.concurrent_state_mut();
908        assert!(
909            state.table.get_mut().is_empty(),
910            "non-empty table: {:?}",
911            state.table.get_mut()
912        );
913        assert!(state.high_priority.is_empty());
914        assert!(state.low_priority.is_empty());
915        assert!(state.guest_thread.is_none());
916        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
917        assert!(
918            state
919                .instance_states
920                .iter()
921                .all(|(_, state)| state.pending.is_empty())
922        );
923        assert!(state.global_error_context_ref_counts.is_empty());
924    }
925
926    /// Spawn a background task to run as part of this instance's event loop.
927    ///
928    /// The task will receive an `&Accessor<U>` and run concurrently with
929    /// any other tasks in progress for the instance.
930    ///
931    /// Note that the task will only make progress if and when the event loop
932    /// for this instance is run.
933    ///
934    /// The returned [`SpawnHandle`] may be used to cancel the task.
935    pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
936    where
937        T: 'static,
938    {
939        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
940        self.spawn_with_accessor(accessor, task)
941    }
942
943    /// Internal implementation of `spawn` functions where a `store` is
944    /// available along with an `Accessor`.
945    fn spawn_with_accessor<D>(
946        self,
947        accessor: Accessor<T, D>,
948        task: impl AccessorTask<T, D, Result<()>>,
949    ) -> JoinHandle
950    where
951        T: 'static,
952        D: HasData + ?Sized,
953    {
954        // Create an "abortable future" here where internally the future will
955        // hook calls to poll and possibly spawn more background tasks on each
956        // iteration.
957        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
958        self.0
959            .concurrent_state_mut()
960            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
961        handle
962    }
963
964    /// Run the specified closure `fun` to completion as part of this store's
965    /// event loop.
966    ///
967    /// This will run `fun` as part of this store's event loop until it
968    /// yields a result.  `fun` is provided an [`Accessor`], which provides
969    /// controlled access to the store and its data.
970    ///
971    /// This function can be used to invoke [`Func::call_concurrent`] for
972    /// example within the async closure provided here.
973    ///
974    /// # Store-blocking behavior
975    ///
976    ///
977    ///
978    /// At this time there are certain situations in which the `Future` returned
979    /// by the `AsyncFnOnce` passed to this function will not be polled for an
980    /// extended period of time, despite one or more `Waker::wake` events having
981    /// occurred for the task to which it belongs.  This can manifest as the
982    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
983    /// the `Store` being held by e.g. a blocking host function, preventing the
984    /// `Future` from being polled. A canonical example of this is when the
985    /// `fun` provided to this function attempts to set a timeout for an
986    /// invocation of a wasm function. In this situation the async closure is
987    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
988    /// to elapse. At this time this setup will not always work and the timeout
989    /// may not reliably fire.
990    ///
991    /// This function will not block the current thread and as such is always
992    /// suitable to run in an `async` context, but the current implementation of
993    /// Wasmtime can lead to situations where a certain wasm computation is
994    /// required to make progress the closure to make progress. This is an
995    /// artifact of Wasmtime's historical implementation of `async` functions
996    /// and is the topic of [#11869] and [#11870]. In the timeout example from
997    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
998    /// progress for a readiness notification of (b) to get delivered.
999    ///
1000    /// This effectively means that it's not possible to reliably perform a
1001    /// "select" operation within the `fun` closure, which timeouts for example
1002    /// are based on. Fixing this requires some relatively major refactoring
1003    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1004    /// that is intended to be fixed one day. In the meantime it's recommended
1005    /// to apply timeouts or such to the entire `run_concurrent` call itself
1006    /// rather than internally.
1007    ///
1008    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1009    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1010    ///
1011    /// # Example
1012    ///
1013    /// ```
1014    /// # use {
1015    /// #   anyhow::{Result},
1016    /// #   wasmtime::{
1017    /// #     component::{ Component, Linker, Resource, ResourceTable},
1018    /// #     Config, Engine, Store
1019    /// #   },
1020    /// # };
1021    /// #
1022    /// # struct MyResource(u32);
1023    /// # struct Ctx { table: ResourceTable }
1024    /// #
1025    /// # async fn foo() -> Result<()> {
1026    /// # let mut config = Config::new();
1027    /// # let engine = Engine::new(&config)?;
1028    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1029    /// # let mut linker = Linker::new(&engine);
1030    /// # let component = Component::new(&engine, "")?;
1031    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1032    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1033    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1034    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1035    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1036    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1037    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1038    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1039    ///    Ok(())
1040    /// }).await??;
1041    /// # Ok(())
1042    /// # }
1043    /// ```
1044    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1045    where
1046        T: Send + 'static,
1047    {
1048        self.do_run_concurrent(fun, false).await
1049    }
1050
1051    pub(super) async fn run_concurrent_trap_on_idle<R>(
1052        self,
1053        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1054    ) -> Result<R>
1055    where
1056        T: Send + 'static,
1057    {
1058        self.do_run_concurrent(fun, true).await
1059    }
1060
1061    async fn do_run_concurrent<R>(
1062        mut self,
1063        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1064        trap_on_idle: bool,
1065    ) -> Result<R>
1066    where
1067        T: Send + 'static,
1068    {
1069        check_recursive_run();
1070        let token = StoreToken::new(self.as_context_mut());
1071
1072        struct Dropper<'a, T: 'static, V> {
1073            store: StoreContextMut<'a, T>,
1074            value: ManuallyDrop<V>,
1075        }
1076
1077        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1078            fn drop(&mut self) {
1079                tls::set(self.store.0, || {
1080                    // SAFETY: Here we drop the value without moving it for the
1081                    // first and only time -- per the contract for `Drop::drop`,
1082                    // this code won't run again, and the `value` field will no
1083                    // longer be accessible.
1084                    unsafe { ManuallyDrop::drop(&mut self.value) }
1085                });
1086            }
1087        }
1088
1089        let accessor = &Accessor::new(token);
1090        let dropper = &mut Dropper {
1091            store: self,
1092            value: ManuallyDrop::new(fun(accessor)),
1093        };
1094        // SAFETY: We never move `dropper` nor its `value` field.
1095        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1096
1097        dropper
1098            .store
1099            .as_context_mut()
1100            .poll_until(future, trap_on_idle)
1101            .await
1102    }
1103
1104    /// Run this store's event loop.
1105    ///
1106    /// The returned future will resolve when the specified future completes or,
1107    /// if `trap_on_idle` is true, when the event loop can't make further
1108    /// progress.
1109    async fn poll_until<R>(
1110        mut self,
1111        mut future: Pin<&mut impl Future<Output = R>>,
1112        trap_on_idle: bool,
1113    ) -> Result<R>
1114    where
1115        T: Send + 'static,
1116    {
1117        struct Reset<'a, T: 'static> {
1118            store: StoreContextMut<'a, T>,
1119            futures: Option<FuturesUnordered<HostTaskFuture>>,
1120        }
1121
1122        impl<'a, T> Drop for Reset<'a, T> {
1123            fn drop(&mut self) {
1124                if let Some(futures) = self.futures.take() {
1125                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1126                }
1127            }
1128        }
1129
1130        loop {
1131            // Take `ConcurrentState::futures` out of the store so we can poll
1132            // it while also safely giving any of the futures inside access to
1133            // `self`.
1134            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1135            let mut reset = Reset {
1136                store: self.as_context_mut(),
1137                futures,
1138            };
1139            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1140
1141            let result = future::poll_fn(|cx| {
1142                // First, poll the future we were passed as an argument and
1143                // return immediately if it's ready.
1144                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1145                    return Poll::Ready(Ok(Either::Left(value)));
1146                }
1147
1148                // Next, poll `ConcurrentState::futures` (which includes any
1149                // pending host tasks and/or background tasks), returning
1150                // immediately if one of them fails.
1151                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1152                    Poll::Ready(Some(output)) => {
1153                        match output {
1154                            Err(e) => return Poll::Ready(Err(e)),
1155                            Ok(()) => {}
1156                        }
1157                        Poll::Ready(true)
1158                    }
1159                    Poll::Ready(None) => Poll::Ready(false),
1160                    Poll::Pending => Poll::Pending,
1161                };
1162
1163                // Next, check the "high priority" work queue and return
1164                // immediately if it has at least one item.
1165                let state = reset.store.0.concurrent_state_mut();
1166                let ready = mem::take(&mut state.high_priority);
1167                let ready = if ready.is_empty() {
1168                    // Next, check the "low priority" work queue and return
1169                    // immediately if it has at least one item.
1170                    let ready = mem::take(&mut state.low_priority);
1171                    if ready.is_empty() {
1172                        return match next {
1173                            Poll::Ready(true) => {
1174                                // In this case, one of the futures in
1175                                // `ConcurrentState::futures` completed
1176                                // successfully, so we return now and continue
1177                                // the outer loop in case there is another one
1178                                // ready to complete.
1179                                Poll::Ready(Ok(Either::Right(Vec::new())))
1180                            }
1181                            Poll::Ready(false) => {
1182                                // Poll the future we were passed one last time
1183                                // in case one of `ConcurrentState::futures` had
1184                                // the side effect of unblocking it.
1185                                if let Poll::Ready(value) =
1186                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1187                                {
1188                                    Poll::Ready(Ok(Either::Left(value)))
1189                                } else {
1190                                    // In this case, there are no more pending
1191                                    // futures in `ConcurrentState::futures`,
1192                                    // there are no remaining work items, _and_
1193                                    // the future we were passed as an argument
1194                                    // still hasn't completed.
1195                                    if trap_on_idle {
1196                                        // `trap_on_idle` is true, so we exit
1197                                        // immediately.
1198                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1199                                    } else {
1200                                        // `trap_on_idle` is false, so we assume
1201                                        // that future will wake up and give us
1202                                        // more work to do when it's ready to.
1203                                        Poll::Pending
1204                                    }
1205                                }
1206                            }
1207                            // There is at least one pending future in
1208                            // `ConcurrentState::futures` and we have nothing
1209                            // else to do but wait for now, so we return
1210                            // `Pending`.
1211                            Poll::Pending => Poll::Pending,
1212                        };
1213                    } else {
1214                        ready
1215                    }
1216                } else {
1217                    ready
1218                };
1219
1220                Poll::Ready(Ok(Either::Right(ready)))
1221            })
1222            .await;
1223
1224            // Put the `ConcurrentState::futures` back into the store before we
1225            // return or handle any work items since one or more of those items
1226            // might append more futures.
1227            drop(reset);
1228
1229            match result? {
1230                // The future we were passed as an argument completed, so we
1231                // return the result.
1232                Either::Left(value) => break Ok(value),
1233                // The future we were passed has not yet completed, so handle
1234                // any work items and then loop again.
1235                Either::Right(ready) => {
1236                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1237                        store: StoreContextMut<'a, T>,
1238                        ready: I,
1239                    }
1240
1241                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1242                        fn drop(&mut self) {
1243                            while let Some(item) = self.ready.next() {
1244                                match item {
1245                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1246                                    WorkItem::PushFuture(future) => {
1247                                        tls::set(self.store.0, move || drop(future))
1248                                    }
1249                                    _ => {}
1250                                }
1251                            }
1252                        }
1253                    }
1254
1255                    let mut dispose = Dispose {
1256                        store: self.as_context_mut(),
1257                        ready: ready.into_iter(),
1258                    };
1259
1260                    while let Some(item) = dispose.ready.next() {
1261                        dispose
1262                            .store
1263                            .as_context_mut()
1264                            .handle_work_item(item)
1265                            .await?;
1266                    }
1267                }
1268            }
1269        }
1270    }
1271
1272    /// Handle the specified work item, possibly resuming a fiber if applicable.
1273    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1274    where
1275        T: Send,
1276    {
1277        log::trace!("handle work item {item:?}");
1278        match item {
1279            WorkItem::PushFuture(future) => {
1280                self.0
1281                    .concurrent_state_mut()
1282                    .futures
1283                    .get_mut()
1284                    .as_mut()
1285                    .unwrap()
1286                    .push(future.into_inner());
1287            }
1288            WorkItem::ResumeFiber(fiber) => {
1289                self.0.resume_fiber(fiber).await?;
1290            }
1291            WorkItem::GuestCall(call) => {
1292                let state = self.0.concurrent_state_mut();
1293                if call.is_ready(state)? {
1294                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1295                } else {
1296                    let task = state.get_mut(call.thread.task)?;
1297                    if !task.starting_sent {
1298                        task.starting_sent = true;
1299                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1300                            Waitable::Guest(call.thread.task).set_event(
1301                                state,
1302                                Some(Event::Subtask {
1303                                    status: Status::Starting,
1304                                }),
1305                            )?;
1306                        }
1307                    }
1308
1309                    let runtime_instance = state.get_mut(call.thread.task)?.instance;
1310                    state
1311                        .instance_state(runtime_instance)
1312                        .pending
1313                        .insert(call.thread, call.kind);
1314                }
1315            }
1316            WorkItem::Poll(params) => {
1317                let state = self.0.concurrent_state_mut();
1318                if state.get_mut(params.thread.task)?.event.is_some()
1319                    || !state.get_mut(params.set)?.ready.is_empty()
1320                {
1321                    // There's at least one event immediately available; deliver
1322                    // it to the guest ASAP.
1323                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1324                        thread: params.thread,
1325                        kind: GuestCallKind::DeliverEvent {
1326                            instance: params.instance,
1327                            set: Some(params.set),
1328                        },
1329                    }));
1330                } else {
1331                    // There are no events immediately available; deliver
1332                    // `Event::None` to the guest.
1333                    state.get_mut(params.thread.task)?.event = Some(Event::None);
1334                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1335                        thread: params.thread,
1336                        kind: GuestCallKind::DeliverEvent {
1337                            instance: params.instance,
1338                            set: Some(params.set),
1339                        },
1340                    }));
1341                }
1342            }
1343            WorkItem::WorkerFunction(fun) => {
1344                self.run_on_worker(WorkerItem::Function(fun)).await?;
1345            }
1346        }
1347
1348        Ok(())
1349    }
1350
1351    /// Execute the specified guest call on a worker fiber.
1352    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1353    where
1354        T: Send,
1355    {
1356        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1357            fiber
1358        } else {
1359            fiber::make_fiber(self.0, move |store| {
1360                loop {
1361                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1362                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1363                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1364                    }
1365
1366                    store.suspend(SuspendReason::NeedWork)?;
1367                }
1368            })?
1369        };
1370
1371        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1372        assert!(worker_item.is_none());
1373        *worker_item = Some(item);
1374
1375        self.0.resume_fiber(worker).await
1376    }
1377
1378    /// Wrap the specified host function in a future which will call it, passing
1379    /// it an `&Accessor<T>`.
1380    ///
1381    /// See the `Accessor` documentation for details.
1382    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1383    where
1384        T: 'static,
1385        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1386            + Send
1387            + Sync
1388            + 'static,
1389        R: Send + Sync + 'static,
1390    {
1391        let token = StoreToken::new(self);
1392        async move {
1393            let mut accessor = Accessor::new(token);
1394            closure(&mut accessor).await
1395        }
1396    }
1397}
1398
1399impl StoreOpaque {
1400    /// Resume the specified fiber, giving it exclusive access to the specified
1401    /// store.
1402    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1403        let old_thread = self.concurrent_state_mut().guest_thread;
1404        log::trace!("resume_fiber: save current thread {old_thread:?}");
1405
1406        let fiber = fiber::resolve_or_release(self, fiber).await?;
1407
1408        let state = self.concurrent_state_mut();
1409
1410        state.guest_thread = old_thread;
1411        if let Some(ref ot) = old_thread {
1412            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1413        }
1414        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1415
1416        if let Some(mut fiber) = fiber {
1417            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1418            // See the `SuspendReason` documentation for what each case means.
1419            match state.suspend_reason.take().unwrap() {
1420                SuspendReason::NeedWork => {
1421                    if state.worker.is_none() {
1422                        state.worker = Some(fiber);
1423                    } else {
1424                        fiber.dispose(self);
1425                    }
1426                }
1427                SuspendReason::Yielding { thread, .. } => {
1428                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1429                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1430                }
1431                SuspendReason::ExplicitlySuspending { thread, .. } => {
1432                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1433                }
1434                SuspendReason::Waiting { set, thread } => {
1435                    let old = state
1436                        .get_mut(set)?
1437                        .waiting
1438                        .insert(thread, WaitMode::Fiber(fiber));
1439                    assert!(old.is_none());
1440                }
1441            };
1442        } else {
1443            log::trace!("resume_fiber: fiber has exited");
1444        }
1445
1446        Ok(())
1447    }
1448
1449    /// Suspend the current fiber, storing the reason in
1450    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1451    /// it should be resumed.
1452    ///
1453    /// See the `SuspendReason` documentation for details.
1454    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1455        log::trace!("suspend fiber: {reason:?}");
1456
1457        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1458        // pop the call context which manages resource borrows before suspending
1459        // and then push it again once we've resumed.
1460        let task = match &reason {
1461            SuspendReason::Yielding { thread, .. }
1462            | SuspendReason::Waiting { thread, .. }
1463            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1464            SuspendReason::NeedWork => None,
1465        };
1466
1467        let old_guest_thread = if let Some(task) = task {
1468            self.maybe_pop_call_context(task)?;
1469            self.concurrent_state_mut().guest_thread
1470        } else {
1471            None
1472        };
1473
1474        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1475        assert!(suspend_reason.is_none());
1476        *suspend_reason = Some(reason);
1477
1478        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1479
1480        if let Some(task) = task {
1481            self.concurrent_state_mut().guest_thread = old_guest_thread;
1482            self.maybe_push_call_context(task)?;
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Push the call context for managing resource borrows for the specified
1489    /// guest task if it has not yet either returned a result or cancelled
1490    /// itself.
1491    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1492        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1493
1494        if !task.returned_or_cancelled() {
1495            log::trace!("push call context for {guest_task:?}");
1496            let call_context = task.call_context.take().unwrap();
1497            self.component_resource_state().0.push(call_context);
1498        }
1499        Ok(())
1500    }
1501
1502    /// Pop the call context for managing resource borrows for the specified
1503    /// guest task if it has not yet either returned a result or cancelled
1504    /// itself.
1505    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1506        if !self
1507            .concurrent_state_mut()
1508            .get_mut(guest_task)?
1509            .returned_or_cancelled()
1510        {
1511            log::trace!("pop call context for {guest_task:?}");
1512            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1513            self.concurrent_state_mut()
1514                .get_mut(guest_task)?
1515                .call_context = call_context;
1516        }
1517        Ok(())
1518    }
1519
1520    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1521        let state = self.concurrent_state_mut();
1522        let caller = state.guest_thread.unwrap();
1523        let old_set = waitable.common(state)?.set;
1524        let set = state.get_mut(caller.task)?.sync_call_set;
1525        waitable.join(state, Some(set))?;
1526        self.suspend(SuspendReason::Waiting {
1527            set,
1528            thread: caller,
1529        })?;
1530        let state = self.concurrent_state_mut();
1531        waitable.join(state, old_set)
1532    }
1533}
1534
1535impl Instance {
1536    /// Get the next pending event for the specified task and (optional)
1537    /// waitable set, along with the waitable handle if applicable.
1538    fn get_event(
1539        self,
1540        store: &mut StoreOpaque,
1541        guest_task: TableId<GuestTask>,
1542        set: Option<TableId<WaitableSet>>,
1543        cancellable: bool,
1544    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1545        let state = store.concurrent_state_mut();
1546
1547        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1548            log::trace!("deliver event {event:?} to {guest_task:?}");
1549
1550            if cancellable || !matches!(event, Event::Cancelled) {
1551                return Ok(Some((event, None)));
1552            } else {
1553                state.get_mut(guest_task)?.event = Some(event);
1554            }
1555        }
1556
1557        Ok(
1558            if let Some((set, waitable)) = set
1559                .and_then(|set| {
1560                    state
1561                        .get_mut(set)
1562                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1563                        .transpose()
1564                })
1565                .transpose()?
1566            {
1567                let common = waitable.common(state)?;
1568                let handle = common.handle.unwrap();
1569                let event = common.event.take().unwrap();
1570
1571                log::trace!(
1572                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1573                );
1574
1575                waitable.on_delivery(store, self, event);
1576
1577                Some((event, Some((waitable, handle))))
1578            } else {
1579                None
1580            },
1581        )
1582    }
1583
1584    /// Handle the `CallbackCode` returned from an async-lifted export or its
1585    /// callback.
1586    fn handle_callback_code(
1587        self,
1588        store: &mut StoreOpaque,
1589        guest_thread: QualifiedThreadId,
1590        runtime_instance: RuntimeComponentInstanceIndex,
1591        code: u32,
1592    ) -> Result<()> {
1593        let (code, set) = unpack_callback_code(code);
1594
1595        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597        let state = store.concurrent_state_mut();
1598
1599        let get_set = |store, handle| {
1600            if handle == 0 {
1601                bail!("invalid waitable-set handle");
1602            }
1603
1604            let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1605                .waitable_set_rep(handle)?;
1606
1607            Ok(TableId::<WaitableSet>::new(set))
1608        };
1609
1610        match code {
1611            callback_code::EXIT => {
1612                log::trace!("implicit thread {guest_thread:?} completed");
1613                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1614                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1615                if task.threads.is_empty() && !task.returned_or_cancelled() {
1616                    bail!(Trap::NoAsyncResult);
1617                }
1618                match &task.caller {
1619                    Caller::Host { .. } => {
1620                        if task.ready_to_delete() {
1621                            Waitable::Guest(guest_thread.task)
1622                                .delete_from(store.concurrent_state_mut())?;
1623                        }
1624                    }
1625                    Caller::Guest { .. } => {
1626                        task.exited = true;
1627                        task.callback = None;
1628                    }
1629                }
1630            }
1631            callback_code::YIELD => {
1632                // Push this thread onto the "low priority" queue so it runs after
1633                // any other threads have had a chance to run.
1634                let task = state.get_mut(guest_thread.task)?;
1635                assert!(task.event.is_none());
1636                task.event = Some(Event::None);
1637                state.push_low_priority(WorkItem::GuestCall(GuestCall {
1638                    thread: guest_thread,
1639                    kind: GuestCallKind::DeliverEvent {
1640                        instance: self,
1641                        set: None,
1642                    },
1643                }));
1644            }
1645            callback_code::WAIT | callback_code::POLL => {
1646                let set = get_set(store, set)?;
1647                let state = store.concurrent_state_mut();
1648
1649                if state.get_mut(guest_thread.task)?.event.is_some()
1650                    || !state.get_mut(set)?.ready.is_empty()
1651                {
1652                    // An event is immediately available; deliver it ASAP.
1653                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1654                        thread: guest_thread,
1655                        kind: GuestCallKind::DeliverEvent {
1656                            instance: self,
1657                            set: Some(set),
1658                        },
1659                    }));
1660                } else {
1661                    // No event is immediately available.
1662                    match code {
1663                        callback_code::POLL => {
1664                            // We're polling, so just yield and check whether an
1665                            // event has arrived after that.
1666                            state.push_low_priority(WorkItem::Poll(PollParams {
1667                                instance: self,
1668                                thread: guest_thread,
1669                                set,
1670                            }));
1671                        }
1672                        callback_code::WAIT => {
1673                            // We're waiting, so register to be woken up when an
1674                            // event is published for this waitable set.
1675                            //
1676                            // Here we also set `GuestTask::wake_on_cancel`
1677                            // which allows `subtask.cancel` to interrupt the
1678                            // wait.
1679                            let old = state
1680                                .get_mut(guest_thread.thread)?
1681                                .wake_on_cancel
1682                                .replace(set);
1683                            assert!(old.is_none());
1684                            let old = state
1685                                .get_mut(set)?
1686                                .waiting
1687                                .insert(guest_thread, WaitMode::Callback(self));
1688                            assert!(old.is_none());
1689                        }
1690                        _ => unreachable!(),
1691                    }
1692                }
1693            }
1694            _ => bail!("unsupported callback code: {code}"),
1695        }
1696
1697        Ok(())
1698    }
1699
1700    fn cleanup_thread(
1701        self,
1702        store: &mut StoreOpaque,
1703        guest_thread: QualifiedThreadId,
1704        runtime_instance: RuntimeComponentInstanceIndex,
1705    ) -> Result<()> {
1706        let guest_id = store
1707            .concurrent_state_mut()
1708            .get_mut(guest_thread.thread)?
1709            .instance_rep;
1710        self.id().get_mut(store).guest_tables().0[runtime_instance]
1711            .guest_thread_remove(guest_id.unwrap())?;
1712
1713        store.concurrent_state_mut().delete(guest_thread.thread)?;
1714        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715        task.threads.remove(&guest_thread.thread);
1716        Ok(())
1717    }
1718
1719    /// Add the specified guest call to the "high priority" work item queue, to
1720    /// be started as soon as backpressure and/or reentrance rules allow.
1721    ///
1722    /// SAFETY: The raw pointer arguments must be valid references to guest
1723    /// functions (with the appropriate signatures) when the closures queued by
1724    /// this function are called.
1725    unsafe fn queue_call<T: 'static>(
1726        self,
1727        mut store: StoreContextMut<T>,
1728        guest_thread: QualifiedThreadId,
1729        callee: SendSyncPtr<VMFuncRef>,
1730        param_count: usize,
1731        result_count: usize,
1732        flags: Option<InstanceFlags>,
1733        async_: bool,
1734        callback: Option<SendSyncPtr<VMFuncRef>>,
1735        post_return: Option<SendSyncPtr<VMFuncRef>>,
1736    ) -> Result<()> {
1737        /// Return a closure which will call the specified function in the scope
1738        /// of the specified task.
1739        ///
1740        /// This will use `GuestTask::lower_params` to lower the parameters, but
1741        /// will not lift the result; instead, it returns a
1742        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1743        /// any, may be lifted.  Note that an async-lifted export will have
1744        /// returned its result using the `task.return` intrinsic (or not
1745        /// returned a result at all, in the case of `task.cancel`), in which
1746        /// case the "result" of this call will either be a callback code or
1747        /// nothing.
1748        ///
1749        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1750        /// the returned closure is called.
1751        unsafe fn make_call<T: 'static>(
1752            store: StoreContextMut<T>,
1753            guest_thread: QualifiedThreadId,
1754            callee: SendSyncPtr<VMFuncRef>,
1755            param_count: usize,
1756            result_count: usize,
1757            flags: Option<InstanceFlags>,
1758        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1759        + Send
1760        + Sync
1761        + 'static
1762        + use<T> {
1763            let token = StoreToken::new(store);
1764            move |store: &mut dyn VMStore| {
1765                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1766
1767                store
1768                    .concurrent_state_mut()
1769                    .get_mut(guest_thread.thread)?
1770                    .state = GuestThreadState::Running;
1771                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1772                let may_enter_after_call = task.call_post_return_automatically();
1773                let lower = task.lower_params.take().unwrap();
1774
1775                lower(store, &mut storage[..param_count])?;
1776
1777                let mut store = token.as_context_mut(store);
1778
1779                // SAFETY: Per the contract documented in `make_call's`
1780                // documentation, `callee` must be a valid pointer.
1781                unsafe {
1782                    if let Some(mut flags) = flags {
1783                        flags.set_may_enter(false);
1784                    }
1785                    crate::Func::call_unchecked_raw(
1786                        &mut store,
1787                        callee.as_non_null(),
1788                        NonNull::new(
1789                            &mut storage[..param_count.max(result_count)]
1790                                as *mut [MaybeUninit<ValRaw>] as _,
1791                        )
1792                        .unwrap(),
1793                    )?;
1794                    if let Some(mut flags) = flags {
1795                        flags.set_may_enter(may_enter_after_call);
1796                    }
1797                }
1798
1799                Ok(storage)
1800            }
1801        }
1802
1803        // SAFETY: Per the contract described in this function documentation,
1804        // the `callee` pointer which `call` closes over must be valid when
1805        // called by the closure we queue below.
1806        let call = unsafe {
1807            make_call(
1808                store.as_context_mut(),
1809                guest_thread,
1810                callee,
1811                param_count,
1812                result_count,
1813                flags,
1814            )
1815        };
1816
1817        let callee_instance = store
1818            .0
1819            .concurrent_state_mut()
1820            .get_mut(guest_thread.task)?
1821            .instance;
1822        let fun = if callback.is_some() {
1823            assert!(async_);
1824
1825            Box::new(move |store: &mut dyn VMStore| {
1826                self.add_guest_thread_to_instance_table(
1827                    guest_thread.thread,
1828                    store,
1829                    callee_instance,
1830                )?;
1831                let old_thread = store
1832                    .concurrent_state_mut()
1833                    .guest_thread
1834                    .replace(guest_thread);
1835                log::trace!(
1836                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1837                );
1838
1839                store.maybe_push_call_context(guest_thread.task)?;
1840
1841                store.concurrent_state_mut().enter_instance(callee_instance);
1842
1843                // SAFETY: See the documentation for `make_call` to review the
1844                // contract we must uphold for `call` here.
1845                //
1846                // Per the contract described in the `queue_call`
1847                // documentation, the `callee` pointer which `call` closes
1848                // over must be valid.
1849                let storage = call(store)?;
1850
1851                store
1852                    .concurrent_state_mut()
1853                    .exit_instance(callee_instance)?;
1854
1855                store.maybe_pop_call_context(guest_thread.task)?;
1856
1857                let state = store.concurrent_state_mut();
1858                state.guest_thread = old_thread;
1859                old_thread
1860                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1861                log::trace!("stackless call: restored {old_thread:?} as current thread");
1862
1863                // SAFETY: `wasmparser` will have validated that the callback
1864                // function returns a `i32` result.
1865                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1866
1867                self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1868
1869                Ok(())
1870            }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1871        } else {
1872            let token = StoreToken::new(store.as_context_mut());
1873            Box::new(move |store: &mut dyn VMStore| {
1874                self.add_guest_thread_to_instance_table(
1875                    guest_thread.thread,
1876                    store,
1877                    callee_instance,
1878                )?;
1879                let old_thread = store
1880                    .concurrent_state_mut()
1881                    .guest_thread
1882                    .replace(guest_thread);
1883                log::trace!(
1884                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1885                );
1886                let mut flags = self.id().get(store).instance_flags(callee_instance);
1887
1888                store.maybe_push_call_context(guest_thread.task)?;
1889
1890                // Unless this is a callback-less (i.e. stackful)
1891                // async-lifted export, we need to record that the instance
1892                // cannot be entered until the call returns.
1893                if !async_ {
1894                    store.concurrent_state_mut().enter_instance(callee_instance);
1895                }
1896
1897                // SAFETY: See the documentation for `make_call` to review the
1898                // contract we must uphold for `call` here.
1899                //
1900                // Per the contract described in the `queue_call`
1901                // documentation, the `callee` pointer which `call` closes
1902                // over must be valid.
1903                let storage = call(store)?;
1904
1905                // This is a callback-less call, so the implicit thread has now completed
1906                self.cleanup_thread(store, guest_thread, callee_instance)?;
1907
1908                if async_ {
1909                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1910                    if task.threads.is_empty() && !task.returned_or_cancelled() {
1911                        bail!(Trap::NoAsyncResult);
1912                    }
1913                } else {
1914                    // This is a sync-lifted export, so now is when we lift the
1915                    // result, optionally call the post-return function, if any,
1916                    // and finally notify any current or future waiters that the
1917                    // subtask has returned.
1918
1919                    let lift = {
1920                        let state = store.concurrent_state_mut();
1921                        state.exit_instance(callee_instance)?;
1922
1923                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
1924
1925                        state
1926                            .get_mut(guest_thread.task)?
1927                            .lift_result
1928                            .take()
1929                            .unwrap()
1930                    };
1931
1932                    // SAFETY: `result_count` represents the number of core Wasm
1933                    // results returned, per `wasmparser`.
1934                    let result = (lift.lift)(store, unsafe {
1935                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1936                            &storage[..result_count],
1937                        )
1938                    })?;
1939
1940                    let post_return_arg = match result_count {
1941                        0 => ValRaw::i32(0),
1942                        // SAFETY: `result_count` represents the number of
1943                        // core Wasm results returned, per `wasmparser`.
1944                        1 => unsafe { storage[0].assume_init() },
1945                        _ => unreachable!(),
1946                    };
1947
1948                    if store
1949                        .concurrent_state_mut()
1950                        .get_mut(guest_thread.task)?
1951                        .call_post_return_automatically()
1952                    {
1953                        unsafe {
1954                            flags.set_may_leave(false);
1955                            flags.set_needs_post_return(false);
1956                        }
1957
1958                        if let Some(func) = post_return {
1959                            let mut store = token.as_context_mut(store);
1960
1961                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1962                            // either `wasmtime-cranelift`-generated fused adapter
1963                            // code or `component::Options`.  Per `wasmparser`
1964                            // post-return signature validation, we know it takes a
1965                            // single parameter.
1966                            unsafe {
1967                                crate::Func::call_unchecked_raw(
1968                                    &mut store,
1969                                    func.as_non_null(),
1970                                    slice::from_ref(&post_return_arg).into(),
1971                                )?;
1972                            }
1973                        }
1974
1975                        unsafe {
1976                            flags.set_may_leave(true);
1977                            flags.set_may_enter(true);
1978                        }
1979                    }
1980
1981                    self.task_complete(
1982                        store,
1983                        guest_thread.task,
1984                        result,
1985                        Status::Returned,
1986                        post_return_arg,
1987                    )?;
1988                }
1989
1990                store.maybe_pop_call_context(guest_thread.task)?;
1991
1992                let state = store.concurrent_state_mut();
1993                let task = state.get_mut(guest_thread.task)?;
1994
1995                match &task.caller {
1996                    Caller::Host { .. } => {
1997                        if task.ready_to_delete() {
1998                            Waitable::Guest(guest_thread.task).delete_from(state)?;
1999                        }
2000                    }
2001                    Caller::Guest { .. } => {
2002                        task.exited = true;
2003                    }
2004                }
2005
2006                Ok(())
2007            })
2008        };
2009
2010        store
2011            .0
2012            .concurrent_state_mut()
2013            .push_high_priority(WorkItem::GuestCall(GuestCall {
2014                thread: guest_thread,
2015                kind: GuestCallKind::StartImplicit(fun),
2016            }));
2017
2018        Ok(())
2019    }
2020
2021    /// Prepare (but do not start) a guest->guest call.
2022    ///
2023    /// This is called from fused adapter code generated in
2024    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2025    /// are synthesized Wasm functions which move the parameters from the caller
2026    /// to the callee and the result from the callee to the caller,
2027    /// respectively.  The adapter will call `Self::start_call` immediately
2028    /// after calling this function.
2029    ///
2030    /// SAFETY: All the pointer arguments must be valid pointers to guest
2031    /// entities (and with the expected signatures for the function references
2032    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2033    unsafe fn prepare_call<T: 'static>(
2034        self,
2035        mut store: StoreContextMut<T>,
2036        start: *mut VMFuncRef,
2037        return_: *mut VMFuncRef,
2038        caller_instance: RuntimeComponentInstanceIndex,
2039        callee_instance: RuntimeComponentInstanceIndex,
2040        task_return_type: TypeTupleIndex,
2041        memory: *mut VMMemoryDefinition,
2042        string_encoding: u8,
2043        caller_info: CallerInfo,
2044    ) -> Result<()> {
2045        self.id().get(store.0).check_may_leave(caller_instance)?;
2046
2047        enum ResultInfo {
2048            Heap { results: u32 },
2049            Stack { result_count: u32 },
2050        }
2051
2052        let result_info = match &caller_info {
2053            CallerInfo::Async {
2054                has_result: true,
2055                params,
2056            } => ResultInfo::Heap {
2057                results: params.last().unwrap().get_u32(),
2058            },
2059            CallerInfo::Async {
2060                has_result: false, ..
2061            } => ResultInfo::Stack { result_count: 0 },
2062            CallerInfo::Sync {
2063                result_count,
2064                params,
2065            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2066                results: params.last().unwrap().get_u32(),
2067            },
2068            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2069                result_count: *result_count,
2070            },
2071        };
2072
2073        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2074
2075        // Create a new guest task for the call, closing over the `start` and
2076        // `return_` functions to lift the parameters and lower the result,
2077        // respectively.
2078        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2079        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2080        let token = StoreToken::new(store.as_context_mut());
2081        let state = store.0.concurrent_state_mut();
2082        let old_thread = state.guest_thread.take();
2083        let new_task = GuestTask::new(
2084            state,
2085            Box::new(move |store, dst| {
2086                let mut store = token.as_context_mut(store);
2087                assert!(dst.len() <= MAX_FLAT_PARAMS);
2088                // The `+ 1` here accounts for the return pointer, if any:
2089                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2090                let count = match caller_info {
2091                    // Async callers, if they have a result, use the last
2092                    // parameter as a return pointer so chop that off if
2093                    // relevant here.
2094                    CallerInfo::Async { params, has_result } => {
2095                        let params = &params[..params.len() - usize::from(has_result)];
2096                        for (param, src) in params.iter().zip(&mut src) {
2097                            src.write(*param);
2098                        }
2099                        params.len()
2100                    }
2101
2102                    // Sync callers forward everything directly.
2103                    CallerInfo::Sync { params, .. } => {
2104                        for (param, src) in params.iter().zip(&mut src) {
2105                            src.write(*param);
2106                        }
2107                        params.len()
2108                    }
2109                };
2110                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2111                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2112                // how it was constructed (see
2113                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2114                // for details) we know it takes count parameters and returns
2115                // `dst.len()` results.
2116                unsafe {
2117                    crate::Func::call_unchecked_raw(
2118                        &mut store,
2119                        start.as_non_null(),
2120                        NonNull::new(
2121                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2122                        )
2123                        .unwrap(),
2124                    )?;
2125                }
2126                dst.copy_from_slice(&src[..dst.len()]);
2127                let state = store.0.concurrent_state_mut();
2128                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2129                    state,
2130                    Some(Event::Subtask {
2131                        status: Status::Started,
2132                    }),
2133                )?;
2134                Ok(())
2135            }),
2136            LiftResult {
2137                lift: Box::new(move |store, src| {
2138                    // SAFETY: See comment in closure passed as `lower_params`
2139                    // parameter above.
2140                    let mut store = token.as_context_mut(store);
2141                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2142                    if let ResultInfo::Heap { results } = &result_info {
2143                        my_src.push(ValRaw::u32(*results));
2144                    }
2145                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2146                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2147                    // on how it was constructed (see
2148                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2149                    // for details) we know it takes `src.len()` parameters and
2150                    // returns up to 1 result.
2151                    unsafe {
2152                        crate::Func::call_unchecked_raw(
2153                            &mut store,
2154                            return_.as_non_null(),
2155                            my_src.as_mut_slice().into(),
2156                        )?;
2157                    }
2158                    let state = store.0.concurrent_state_mut();
2159                    let thread = state.guest_thread.unwrap();
2160                    if sync_caller {
2161                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2162                            if let ResultInfo::Stack { result_count } = &result_info {
2163                                match result_count {
2164                                    0 => None,
2165                                    1 => Some(my_src[0]),
2166                                    _ => unreachable!(),
2167                                }
2168                            } else {
2169                                None
2170                            },
2171                        );
2172                    }
2173                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2174                }),
2175                ty: task_return_type,
2176                memory: NonNull::new(memory).map(SendSyncPtr::new),
2177                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2178            },
2179            Caller::Guest {
2180                thread: old_thread.unwrap(),
2181                instance: caller_instance,
2182            },
2183            None,
2184            callee_instance,
2185        )?;
2186
2187        let guest_task = state.push(new_task)?;
2188        let new_thread = GuestThread::new_implicit(guest_task);
2189        let guest_thread = state.push(new_thread)?;
2190        state.get_mut(guest_task)?.threads.insert(guest_thread);
2191
2192        let state = store.0.concurrent_state_mut();
2193        if let Some(old_thread) = old_thread {
2194            if !state.may_enter(guest_task) {
2195                bail!(crate::Trap::CannotEnterComponent);
2196            }
2197
2198            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2199        };
2200
2201        // Make the new thread the current one so that `Self::start_call` knows
2202        // which one to start.
2203        state.guest_thread = Some(QualifiedThreadId {
2204            task: guest_task,
2205            thread: guest_thread,
2206        });
2207        log::trace!(
2208            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2209        );
2210
2211        Ok(())
2212    }
2213
2214    /// Call the specified callback function for an async-lifted export.
2215    ///
2216    /// SAFETY: `function` must be a valid reference to a guest function of the
2217    /// correct signature for a callback.
2218    unsafe fn call_callback<T>(
2219        self,
2220        mut store: StoreContextMut<T>,
2221        callee_instance: RuntimeComponentInstanceIndex,
2222        function: SendSyncPtr<VMFuncRef>,
2223        event: Event,
2224        handle: u32,
2225        may_enter_after_call: bool,
2226    ) -> Result<u32> {
2227        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2228
2229        let (ordinal, result) = event.parts();
2230        let params = &mut [
2231            ValRaw::u32(ordinal),
2232            ValRaw::u32(handle),
2233            ValRaw::u32(result),
2234        ];
2235        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2236        // `wasmtime-cranelift`-generated fused adapter code or
2237        // `component::Options`.  Per `wasmparser` callback signature
2238        // validation, we know it takes three parameters and returns one.
2239        unsafe {
2240            flags.set_may_enter(false);
2241            crate::Func::call_unchecked_raw(
2242                &mut store,
2243                function.as_non_null(),
2244                params.as_mut_slice().into(),
2245            )?;
2246            flags.set_may_enter(may_enter_after_call);
2247        }
2248        Ok(params[0].get_u32())
2249    }
2250
2251    /// Start a guest->guest call previously prepared using
2252    /// `Self::prepare_call`.
2253    ///
2254    /// This is called from fused adapter code generated in
2255    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2256    /// this function immediately after calling `Self::prepare_call`.
2257    ///
2258    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2259    /// functions with the appropriate signatures for the current guest task.
2260    /// If this is a call to an async-lowered import, the actual call may be
2261    /// deferred and run after this function returns, in which case the pointer
2262    /// arguments must also be valid when the call happens.
2263    unsafe fn start_call<T: 'static>(
2264        self,
2265        mut store: StoreContextMut<T>,
2266        callback: *mut VMFuncRef,
2267        post_return: *mut VMFuncRef,
2268        callee: *mut VMFuncRef,
2269        param_count: u32,
2270        result_count: u32,
2271        flags: u32,
2272        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2273    ) -> Result<u32> {
2274        let token = StoreToken::new(store.as_context_mut());
2275        let async_caller = storage.is_none();
2276        let state = store.0.concurrent_state_mut();
2277        let guest_thread = state.guest_thread.unwrap();
2278        let may_enter_after_call = state
2279            .get_mut(guest_thread.task)?
2280            .call_post_return_automatically();
2281        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2282        let param_count = usize::try_from(param_count).unwrap();
2283        assert!(param_count <= MAX_FLAT_PARAMS);
2284        let result_count = usize::try_from(result_count).unwrap();
2285        assert!(result_count <= MAX_FLAT_RESULTS);
2286
2287        let task = state.get_mut(guest_thread.task)?;
2288        if !callback.is_null() {
2289            // We're calling an async-lifted export with a callback, so store
2290            // the callback and related context as part of the task so we can
2291            // call it later when needed.
2292            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2293            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2294                let store = token.as_context_mut(store);
2295                unsafe {
2296                    self.call_callback::<T>(
2297                        store,
2298                        runtime_instance,
2299                        callback,
2300                        event,
2301                        handle,
2302                        may_enter_after_call,
2303                    )
2304                }
2305            }));
2306        }
2307
2308        let Caller::Guest {
2309            thread: caller,
2310            instance: runtime_instance,
2311        } = &task.caller
2312        else {
2313            // As of this writing, `start_call` is only used for guest->guest
2314            // calls.
2315            unreachable!()
2316        };
2317        let caller = *caller;
2318        let caller_instance = *runtime_instance;
2319
2320        let callee_instance = task.instance;
2321
2322        let instance_flags = if callback.is_null() {
2323            None
2324        } else {
2325            Some(self.id().get(store.0).instance_flags(callee_instance))
2326        };
2327
2328        // Queue the call as a "high priority" work item.
2329        unsafe {
2330            self.queue_call(
2331                store.as_context_mut(),
2332                guest_thread,
2333                callee,
2334                param_count,
2335                result_count,
2336                instance_flags,
2337                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2338                NonNull::new(callback).map(SendSyncPtr::new),
2339                NonNull::new(post_return).map(SendSyncPtr::new),
2340            )?;
2341        }
2342
2343        let state = store.0.concurrent_state_mut();
2344
2345        // Use the caller's `GuestTask::sync_call_set` to register interest in
2346        // the subtask...
2347        let guest_waitable = Waitable::Guest(guest_thread.task);
2348        let old_set = guest_waitable.common(state)?.set;
2349        let set = state.get_mut(caller.task)?.sync_call_set;
2350        guest_waitable.join(state, Some(set))?;
2351
2352        // ... and suspend this fiber temporarily while we wait for it to start.
2353        //
2354        // Note that we _could_ call the callee directly using the current fiber
2355        // rather than suspend this one, but that would make reasoning about the
2356        // event loop more complicated and is probably only worth doing if
2357        // there's a measurable performance benefit.  In addition, it would mean
2358        // blocking the caller if the callee calls a blocking sync-lowered
2359        // import, and as of this writing the spec says we must not do that.
2360        //
2361        // Alternatively, the fused adapter code could be modified to call the
2362        // callee directly without calling a host-provided intrinsic at all (in
2363        // which case it would need to do its own, inline backpressure checks,
2364        // etc.).  Again, we'd want to see a measurable performance benefit
2365        // before committing to such an optimization.  And again, we'd need to
2366        // update the spec to allow that.
2367        let (status, waitable) = loop {
2368            store.0.suspend(SuspendReason::Waiting {
2369                set,
2370                thread: caller,
2371            })?;
2372
2373            let state = store.0.concurrent_state_mut();
2374
2375            log::trace!("taking event for {:?}", guest_thread.task);
2376            let event = guest_waitable.take_event(state)?;
2377            let Some(Event::Subtask { status }) = event else {
2378                unreachable!();
2379            };
2380
2381            log::trace!("status {status:?} for {:?}", guest_thread.task);
2382
2383            if status == Status::Returned {
2384                // It returned, so we can stop waiting.
2385                break (status, None);
2386            } else if async_caller {
2387                // It hasn't returned yet, but the caller is calling via an
2388                // async-lowered import, so we generate a handle for the task
2389                // waitable and return the status.
2390                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2391                    .subtask_insert_guest(guest_thread.task.rep())?;
2392                store
2393                    .0
2394                    .concurrent_state_mut()
2395                    .get_mut(guest_thread.task)?
2396                    .common
2397                    .handle = Some(handle);
2398                break (status, Some(handle));
2399            } else {
2400                // The callee hasn't returned yet, and the caller is calling via
2401                // a sync-lowered import, so we loop and keep waiting until the
2402                // callee returns.
2403            }
2404        };
2405
2406        let state = store.0.concurrent_state_mut();
2407
2408        guest_waitable.join(state, old_set)?;
2409
2410        if let Some(storage) = storage {
2411            // The caller used a sync-lowered import to call an async-lifted
2412            // export, in which case the result, if any, has been stashed in
2413            // `GuestTask::sync_result`.
2414            let task = state.get_mut(guest_thread.task)?;
2415            if let Some(result) = task.sync_result.take() {
2416                if let Some(result) = result {
2417                    storage[0] = MaybeUninit::new(result);
2418                }
2419
2420                if task.exited {
2421                    if task.ready_to_delete() {
2422                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2423                    }
2424                }
2425            }
2426        }
2427
2428        // Reset the current thread to point to the caller as it resumes control.
2429        state.guest_thread = Some(caller);
2430        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2431        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2432
2433        Ok(status.pack(waitable))
2434    }
2435
2436    /// Poll the specified future once on behalf of a guest->host call using an
2437    /// async-lowered import.
2438    ///
2439    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2440    /// `Pending`, add it to the set of futures to be polled as part of this
2441    /// instance's event loop until it completes, and then return
2442    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2443    ///
2444    /// Whether the future returns `Ready` immediately or later, the `lower`
2445    /// function will be used to lower the result, if any, into the guest caller's
2446    /// stack and linear memory unless the task has been cancelled.
2447    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2448        self,
2449        mut store: StoreContextMut<T>,
2450        future: impl Future<Output = Result<R>> + Send + 'static,
2451        caller_instance: RuntimeComponentInstanceIndex,
2452        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2453    ) -> Result<Option<u32>> {
2454        let token = StoreToken::new(store.as_context_mut());
2455        let state = store.0.concurrent_state_mut();
2456        let caller = state.guest_thread.unwrap();
2457
2458        // Create an abortable future which hooks calls to poll and manages call
2459        // context state for the future.
2460        let (join_handle, future) = JoinHandle::run(async move {
2461            let mut future = pin!(future);
2462            let mut call_context = None;
2463            future::poll_fn(move |cx| {
2464                // Push the call context for managing any resource borrows
2465                // for the task.
2466                tls::get(|store| {
2467                    if let Some(call_context) = call_context.take() {
2468                        token
2469                            .as_context_mut(store)
2470                            .0
2471                            .component_resource_state()
2472                            .0
2473                            .push(call_context);
2474                    }
2475                });
2476
2477                let result = future.as_mut().poll(cx);
2478
2479                if result.is_pending() {
2480                    // Pop the call context for managing any resource
2481                    // borrows for the task.
2482                    tls::get(|store| {
2483                        call_context = Some(
2484                            token
2485                                .as_context_mut(store)
2486                                .0
2487                                .component_resource_state()
2488                                .0
2489                                .pop()
2490                                .unwrap(),
2491                        );
2492                    });
2493                }
2494                result
2495            })
2496            .await
2497        });
2498
2499        // We create a new host task even though it might complete immediately
2500        // (in which case we won't need to pass a waitable back to the guest).
2501        // If it does complete immediately, we'll remove it before we return.
2502        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2503
2504        log::trace!("new host task child of {caller:?}: {task:?}");
2505
2506        let mut future = Box::pin(future);
2507
2508        // Finally, poll the future.  We can use a dummy `Waker` here because
2509        // we'll add the future to `ConcurrentState::futures` and poll it
2510        // automatically from the event loop if it doesn't complete immediately
2511        // here.
2512        let poll = tls::set(store.0, || {
2513            future
2514                .as_mut()
2515                .poll(&mut Context::from_waker(&Waker::noop()))
2516        });
2517
2518        Ok(match poll {
2519            Poll::Ready(None) => unreachable!(),
2520            Poll::Ready(Some(result)) => {
2521                // It finished immediately; lower the result and delete the
2522                // task.
2523                lower(store.as_context_mut(), result?)?;
2524                log::trace!("delete host task {task:?} (already ready)");
2525                store.0.concurrent_state_mut().delete(task)?;
2526                None
2527            }
2528            Poll::Pending => {
2529                // It hasn't finished yet; add the future to
2530                // `ConcurrentState::futures` so it will be polled by the event
2531                // loop and allocate a waitable handle to return to the guest.
2532
2533                // Wrap the future in a closure responsible for lowering the result into
2534                // the guest's stack and memory, as well as notifying any waiters that
2535                // the task returned.
2536                let future =
2537                    Box::pin(async move {
2538                        let result = match future.await {
2539                            Some(result) => result?,
2540                            // Task was cancelled; nothing left to do.
2541                            None => return Ok(()),
2542                        };
2543                        tls::get(move |store| {
2544                            // Here we schedule a task to run on a worker fiber to do
2545                            // the lowering since it may involve a call to the guest's
2546                            // realloc function.  This is necessary because calling the
2547                            // guest while there are host embedder frames on the stack
2548                            // is unsound.
2549                            store.concurrent_state_mut().push_high_priority(
2550                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2551                                    lower(token.as_context_mut(store), result)?;
2552                                    let state = store.concurrent_state_mut();
2553                                    state.get_mut(task)?.join_handle.take();
2554                                    Waitable::Host(task).set_event(
2555                                        state,
2556                                        Some(Event::Subtask {
2557                                            status: Status::Returned,
2558                                        }),
2559                                    )
2560                                }))),
2561                            );
2562                            Ok(())
2563                        })
2564                    });
2565
2566                store.0.concurrent_state_mut().push_future(future);
2567                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2568                    .subtask_insert_host(task.rep())?;
2569                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2570                log::trace!(
2571                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2572                );
2573                Some(handle)
2574            }
2575        })
2576    }
2577
2578    /// Implements the `task.return` intrinsic, lifting the result for the
2579    /// current guest task.
2580    pub(crate) fn task_return(
2581        self,
2582        store: &mut dyn VMStore,
2583        caller: RuntimeComponentInstanceIndex,
2584        ty: TypeTupleIndex,
2585        options: OptionsIndex,
2586        storage: &[ValRaw],
2587    ) -> Result<()> {
2588        self.id().get(store).check_may_leave(caller)?;
2589        let state = store.concurrent_state_mut();
2590        let guest_thread = state.guest_thread.unwrap();
2591        let lift = state
2592            .get_mut(guest_thread.task)?
2593            .lift_result
2594            .take()
2595            .ok_or_else(|| {
2596                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2597            })?;
2598        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2599
2600        let CanonicalOptions {
2601            string_encoding,
2602            data_model,
2603            ..
2604        } = &self.id().get(store).component().env_component().options[options];
2605
2606        let invalid = ty != lift.ty
2607            || string_encoding != &lift.string_encoding
2608            || match data_model {
2609                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2610                    Some(memory) => {
2611                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2612                        let actual = self.id().get(store).runtime_memory(memory);
2613                        expected != actual.as_ptr()
2614                    }
2615                    // Memory not specified, meaning it didn't need to be
2616                    // specified per validation, so not invalid.
2617                    None => false,
2618                },
2619                // Always invalid as this isn't supported.
2620                CanonicalOptionsDataModel::Gc { .. } => true,
2621            };
2622
2623        if invalid {
2624            bail!("invalid `task.return` signature and/or options for current task");
2625        }
2626
2627        log::trace!("task.return for {guest_thread:?}");
2628
2629        let result = (lift.lift)(store, storage)?;
2630        self.task_complete(
2631            store,
2632            guest_thread.task,
2633            result,
2634            Status::Returned,
2635            ValRaw::i32(0),
2636        )
2637    }
2638
2639    /// Implements the `task.cancel` intrinsic.
2640    pub(crate) fn task_cancel(
2641        self,
2642        store: &mut StoreOpaque,
2643        caller: RuntimeComponentInstanceIndex,
2644    ) -> Result<()> {
2645        self.id().get(store).check_may_leave(caller)?;
2646        let state = store.concurrent_state_mut();
2647        let guest_thread = state.guest_thread.unwrap();
2648        let task = state.get_mut(guest_thread.task)?;
2649        if !task.cancel_sent {
2650            bail!("`task.cancel` called by task which has not been cancelled")
2651        }
2652        _ = task.lift_result.take().ok_or_else(|| {
2653            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2654        })?;
2655
2656        assert!(task.result.is_none());
2657
2658        log::trace!("task.cancel for {guest_thread:?}");
2659
2660        self.task_complete(
2661            store,
2662            guest_thread.task,
2663            Box::new(DummyResult),
2664            Status::ReturnCancelled,
2665            ValRaw::i32(0),
2666        )
2667    }
2668
2669    /// Complete the specified guest task (i.e. indicate that it has either
2670    /// returned a (possibly empty) result or cancelled itself).
2671    ///
2672    /// This will return any resource borrows and notify any current or future
2673    /// waiters that the task has completed.
2674    fn task_complete(
2675        self,
2676        store: &mut StoreOpaque,
2677        guest_task: TableId<GuestTask>,
2678        result: Box<dyn Any + Send + Sync>,
2679        status: Status,
2680        post_return_arg: ValRaw,
2681    ) -> Result<()> {
2682        if store
2683            .concurrent_state_mut()
2684            .get_mut(guest_task)?
2685            .call_post_return_automatically()
2686        {
2687            let (calls, host_table, _, instance) =
2688                store.component_resource_state_with_instance(self);
2689            ResourceTables {
2690                calls,
2691                host_table: Some(host_table),
2692                guest: Some(instance.guest_tables()),
2693            }
2694            .exit_call()?;
2695        } else {
2696            // As of this writing, the only scenario where `call_post_return_automatically`
2697            // would be false for a `GuestTask` is for host-to-guest calls using
2698            // `[Typed]Func::call_async`, in which case the `function_index`
2699            // should be a non-`None` value.
2700            let function_index = store
2701                .concurrent_state_mut()
2702                .get_mut(guest_task)?
2703                .function_index
2704                .unwrap();
2705            self.id()
2706                .get_mut(store)
2707                .post_return_arg_set(function_index, post_return_arg);
2708        }
2709
2710        let state = store.concurrent_state_mut();
2711        let task = state.get_mut(guest_task)?;
2712
2713        if let Caller::Host { tx, .. } = &mut task.caller {
2714            if let Some(tx) = tx.take() {
2715                _ = tx.send(result);
2716            }
2717        } else {
2718            task.result = Some(result);
2719            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2720        }
2721
2722        Ok(())
2723    }
2724
2725    /// Implements the `waitable-set.new` intrinsic.
2726    pub(crate) fn waitable_set_new(
2727        self,
2728        store: &mut StoreOpaque,
2729        caller_instance: RuntimeComponentInstanceIndex,
2730    ) -> Result<u32> {
2731        self.id().get_mut(store).check_may_leave(caller_instance)?;
2732        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2733        let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2734            .waitable_set_insert(set.rep())?;
2735        log::trace!("new waitable set {set:?} (handle {handle})");
2736        Ok(handle)
2737    }
2738
2739    /// Implements the `waitable-set.drop` intrinsic.
2740    pub(crate) fn waitable_set_drop(
2741        self,
2742        store: &mut StoreOpaque,
2743        caller_instance: RuntimeComponentInstanceIndex,
2744        set: u32,
2745    ) -> Result<()> {
2746        self.id().get_mut(store).check_may_leave(caller_instance)?;
2747        let rep =
2748            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2749
2750        log::trace!("drop waitable set {rep} (handle {set})");
2751
2752        let set = store
2753            .concurrent_state_mut()
2754            .delete(TableId::<WaitableSet>::new(rep))?;
2755
2756        if !set.waiting.is_empty() {
2757            bail!("cannot drop waitable set with waiters");
2758        }
2759
2760        Ok(())
2761    }
2762
2763    /// Implements the `waitable.join` intrinsic.
2764    pub(crate) fn waitable_join(
2765        self,
2766        store: &mut StoreOpaque,
2767        caller_instance: RuntimeComponentInstanceIndex,
2768        waitable_handle: u32,
2769        set_handle: u32,
2770    ) -> Result<()> {
2771        let mut instance = self.id().get_mut(store);
2772        instance.check_may_leave(caller_instance)?;
2773        let waitable =
2774            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2775
2776        let set = if set_handle == 0 {
2777            None
2778        } else {
2779            let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2780
2781            Some(TableId::<WaitableSet>::new(set))
2782        };
2783
2784        log::trace!(
2785            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2786        );
2787
2788        waitable.join(store.concurrent_state_mut(), set)
2789    }
2790
2791    /// Implements the `subtask.drop` intrinsic.
2792    pub(crate) fn subtask_drop(
2793        self,
2794        store: &mut StoreOpaque,
2795        caller_instance: RuntimeComponentInstanceIndex,
2796        task_id: u32,
2797    ) -> Result<()> {
2798        self.id().get_mut(store).check_may_leave(caller_instance)?;
2799        self.waitable_join(store, caller_instance, task_id, 0)?;
2800
2801        let (rep, is_host) =
2802            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2803
2804        let concurrent_state = store.concurrent_state_mut();
2805        let (waitable, expected_caller_instance, delete) = if is_host {
2806            let id = TableId::<HostTask>::new(rep);
2807            let task = concurrent_state.get_mut(id)?;
2808            if task.join_handle.is_some() {
2809                bail!("cannot drop a subtask which has not yet resolved");
2810            }
2811            (Waitable::Host(id), task.caller_instance, true)
2812        } else {
2813            let id = TableId::<GuestTask>::new(rep);
2814            let task = concurrent_state.get_mut(id)?;
2815            if task.lift_result.is_some() {
2816                bail!("cannot drop a subtask which has not yet resolved");
2817            }
2818            if let Caller::Guest { instance, .. } = &task.caller {
2819                (Waitable::Guest(id), *instance, task.exited)
2820            } else {
2821                unreachable!()
2822            }
2823        };
2824
2825        waitable.common(concurrent_state)?.handle = None;
2826
2827        if waitable.take_event(concurrent_state)?.is_some() {
2828            bail!("cannot drop a subtask with an undelivered event");
2829        }
2830
2831        if delete {
2832            waitable.delete_from(concurrent_state)?;
2833        }
2834
2835        // Since waitables can neither be passed between instances nor forged,
2836        // this should never fail unless there's a bug in Wasmtime, but we check
2837        // here to be sure:
2838        assert_eq!(expected_caller_instance, caller_instance);
2839        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2840        Ok(())
2841    }
2842
2843    /// Implements the `waitable-set.wait` intrinsic.
2844    pub(crate) fn waitable_set_wait(
2845        self,
2846        store: &mut StoreOpaque,
2847        caller: RuntimeComponentInstanceIndex,
2848        options: OptionsIndex,
2849        set: u32,
2850        payload: u32,
2851    ) -> Result<u32> {
2852        self.id().get(store).check_may_leave(caller)?;
2853        let &CanonicalOptions {
2854            cancellable,
2855            instance: caller_instance,
2856            ..
2857        } = &self.id().get(store).component().env_component().options[options];
2858        let rep =
2859            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2860
2861        self.waitable_check(
2862            store,
2863            cancellable,
2864            WaitableCheck::Wait(WaitableCheckParams {
2865                set: TableId::new(rep),
2866                options,
2867                payload,
2868            }),
2869        )
2870    }
2871
2872    /// Implements the `waitable-set.poll` intrinsic.
2873    pub(crate) fn waitable_set_poll(
2874        self,
2875        store: &mut StoreOpaque,
2876        caller: RuntimeComponentInstanceIndex,
2877        options: OptionsIndex,
2878        set: u32,
2879        payload: u32,
2880    ) -> Result<u32> {
2881        self.id().get(store).check_may_leave(caller)?;
2882        let &CanonicalOptions {
2883            cancellable,
2884            instance: caller_instance,
2885            ..
2886        } = &self.id().get(store).component().env_component().options[options];
2887        let rep =
2888            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2889
2890        self.waitable_check(
2891            store,
2892            cancellable,
2893            WaitableCheck::Poll(WaitableCheckParams {
2894                set: TableId::new(rep),
2895                options,
2896                payload,
2897            }),
2898        )
2899    }
2900
2901    /// Implements the `thread.index` intrinsic.
2902    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2903        let thread_id = store
2904            .concurrent_state_mut()
2905            .guest_thread
2906            .ok_or_else(|| anyhow!("no current thread"))?
2907            .thread;
2908        // The unwrap is safe because `instance_rep` must be `Some` by this point
2909        Ok(store
2910            .concurrent_state_mut()
2911            .get_mut(thread_id)?
2912            .instance_rep
2913            .unwrap())
2914    }
2915
2916    /// Implements the `thread.new-indirect` intrinsic.
2917    pub(crate) fn thread_new_indirect<T: 'static>(
2918        self,
2919        mut store: StoreContextMut<T>,
2920        runtime_instance: RuntimeComponentInstanceIndex,
2921        _func_ty_idx: TypeFuncIndex, // currently unused
2922        start_func_table_idx: RuntimeTableIndex,
2923        start_func_idx: u32,
2924        context: i32,
2925    ) -> Result<u32> {
2926        self.id().get(store.0).check_may_leave(runtime_instance)?;
2927
2928        log::trace!("creating new thread");
2929
2930        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2931        let (instance, registry) = self.id().get_mut_and_registry(store.0);
2932        let callee = instance
2933            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
2934            .ok_or_else(|| {
2935                anyhow!("the start function index points to an uninitialized function")
2936            })?;
2937        if callee.type_index(store.0) != start_func_ty.type_index() {
2938            bail!(
2939                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2940            );
2941        }
2942
2943        let token = StoreToken::new(store.as_context_mut());
2944        let start_func = Box::new(
2945            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2946                let old_thread = store
2947                    .concurrent_state_mut()
2948                    .guest_thread
2949                    .replace(guest_thread);
2950                log::trace!(
2951                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2952                );
2953
2954                store.maybe_push_call_context(guest_thread.task)?;
2955
2956                let mut store = token.as_context_mut(store);
2957                let mut params = [ValRaw::i32(context)];
2958                // Use call_unchecked rather than call or call_async, as we don't want to run the function
2959                // on a separate fiber if we're running in an async store.
2960                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2961
2962                store.0.maybe_pop_call_context(guest_thread.task)?;
2963
2964                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2965                log::trace!("explicit thread {guest_thread:?} completed");
2966                let state = store.0.concurrent_state_mut();
2967                let task = state.get_mut(guest_thread.task)?;
2968                if task.threads.is_empty() && !task.returned_or_cancelled() {
2969                    bail!(Trap::NoAsyncResult);
2970                }
2971                state.guest_thread = old_thread;
2972                old_thread
2973                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2974                if state.get_mut(guest_thread.task)?.ready_to_delete() {
2975                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2976                }
2977                log::trace!("thread start: restored {old_thread:?} as current thread");
2978
2979                Ok(())
2980            },
2981        );
2982
2983        let state = store.0.concurrent_state_mut();
2984        let current_thread = state.guest_thread.unwrap();
2985        let parent_task = current_thread.task;
2986
2987        let new_thread = GuestThread::new_explicit(parent_task, start_func);
2988        let thread_id = state.push(new_thread)?;
2989        state.get_mut(parent_task)?.threads.insert(thread_id);
2990
2991        log::trace!("new thread with id {thread_id:?} created");
2992
2993        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2994    }
2995
2996    pub(crate) fn resume_suspended_thread(
2997        self,
2998        store: &mut StoreOpaque,
2999        runtime_instance: RuntimeComponentInstanceIndex,
3000        thread_idx: u32,
3001        high_priority: bool,
3002    ) -> Result<()> {
3003        let thread_id =
3004            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3005        let state = store.concurrent_state_mut();
3006        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3007        let thread = state.get_mut(guest_thread.thread)?;
3008
3009        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3010            GuestThreadState::NotStartedExplicit(start_func) => {
3011                log::trace!("starting thread {guest_thread:?}");
3012                let guest_call = WorkItem::GuestCall(GuestCall {
3013                    thread: guest_thread,
3014                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3015                        start_func(store, guest_thread)
3016                    })),
3017                });
3018                store
3019                    .concurrent_state_mut()
3020                    .push_work_item(guest_call, high_priority);
3021            }
3022            GuestThreadState::Suspended(fiber) => {
3023                log::trace!("resuming thread {thread_id:?} that was suspended");
3024                store
3025                    .concurrent_state_mut()
3026                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3027            }
3028            _ => {
3029                bail!("cannot resume thread which is not suspended");
3030            }
3031        }
3032        Ok(())
3033    }
3034
3035    fn add_guest_thread_to_instance_table(
3036        self,
3037        thread_id: TableId<GuestThread>,
3038        store: &mut StoreOpaque,
3039        runtime_instance: RuntimeComponentInstanceIndex,
3040    ) -> Result<u32> {
3041        let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3042            .guest_thread_insert(thread_id.rep())?;
3043        store
3044            .concurrent_state_mut()
3045            .get_mut(thread_id)?
3046            .instance_rep = Some(guest_id);
3047        Ok(guest_id)
3048    }
3049
3050    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3051    /// and `thread.switch-to` intrinsics.
3052    pub(crate) fn suspension_intrinsic(
3053        self,
3054        store: &mut StoreOpaque,
3055        caller: RuntimeComponentInstanceIndex,
3056        cancellable: bool,
3057        yielding: bool,
3058        to_thread: Option<u32>,
3059    ) -> Result<WaitResult> {
3060        // There could be a pending cancellation from a previous uncancellable wait
3061        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3062            return Ok(WaitResult::Cancelled);
3063        }
3064
3065        self.id().get(store).check_may_leave(caller)?;
3066
3067        if let Some(thread) = to_thread {
3068            self.resume_suspended_thread(store, caller, thread, true)?;
3069        }
3070
3071        let state = store.concurrent_state_mut();
3072        let guest_thread = state.guest_thread.unwrap();
3073        let reason = if yielding {
3074            SuspendReason::Yielding {
3075                thread: guest_thread,
3076            }
3077        } else {
3078            SuspendReason::ExplicitlySuspending {
3079                thread: guest_thread,
3080            }
3081        };
3082
3083        store.suspend(reason)?;
3084
3085        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3086            Ok(WaitResult::Cancelled)
3087        } else {
3088            Ok(WaitResult::Completed)
3089        }
3090    }
3091
3092    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3093    fn waitable_check(
3094        self,
3095        store: &mut StoreOpaque,
3096        cancellable: bool,
3097        check: WaitableCheck,
3098    ) -> Result<u32> {
3099        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3100
3101        let (wait, set) = match &check {
3102            WaitableCheck::Wait(params) => (true, Some(params.set)),
3103            WaitableCheck::Poll(params) => (false, Some(params.set)),
3104        };
3105
3106        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3107        // First, suspend this fiber, allowing any other threads to run.
3108        store.suspend(SuspendReason::Yielding {
3109            thread: guest_thread,
3110        })?;
3111
3112        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3113
3114        let state = store.concurrent_state_mut();
3115        let task = state.get_mut(guest_thread.task)?;
3116
3117        // If we're waiting, and there are no events immediately available,
3118        // suspend the fiber until that changes.
3119        if wait {
3120            let set = set.unwrap();
3121
3122            if (task.event.is_none()
3123                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3124                && state.get_mut(set)?.ready.is_empty()
3125            {
3126                if cancellable {
3127                    let old = state
3128                        .get_mut(guest_thread.thread)?
3129                        .wake_on_cancel
3130                        .replace(set);
3131                    assert!(old.is_none());
3132                }
3133
3134                store.suspend(SuspendReason::Waiting {
3135                    set,
3136                    thread: guest_thread,
3137                })?;
3138            }
3139        }
3140
3141        log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3142
3143        let result = match check {
3144            // Deliver any pending events to the guest and return.
3145            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3146                let event =
3147                    self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3148
3149                let (ordinal, handle, result) = if wait {
3150                    let (event, waitable) = event.unwrap();
3151                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3152                    let (ordinal, result) = event.parts();
3153                    (ordinal, handle, result)
3154                } else {
3155                    if let Some((event, waitable)) = event {
3156                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3157                        let (ordinal, result) = event.parts();
3158                        (ordinal, handle, result)
3159                    } else {
3160                        log::trace!(
3161                            "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3162                            guest_thread.task,
3163                            params.set
3164                        );
3165                        let (ordinal, result) = Event::None.parts();
3166                        (ordinal, 0, result)
3167                    }
3168                };
3169                let memory = self.options_memory_mut(store, params.options);
3170                let ptr =
3171                    func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3172                memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3173                memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3174                Ok(ordinal)
3175            }
3176        };
3177
3178        result
3179    }
3180
3181    /// Implements the `subtask.cancel` intrinsic.
3182    pub(crate) fn subtask_cancel(
3183        self,
3184        store: &mut StoreOpaque,
3185        caller_instance: RuntimeComponentInstanceIndex,
3186        async_: bool,
3187        task_id: u32,
3188    ) -> Result<u32> {
3189        self.id().get(store).check_may_leave(caller_instance)?;
3190        let (rep, is_host) =
3191            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3192        let (waitable, expected_caller_instance) = if is_host {
3193            let id = TableId::<HostTask>::new(rep);
3194            (
3195                Waitable::Host(id),
3196                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3197            )
3198        } else {
3199            let id = TableId::<GuestTask>::new(rep);
3200            if let Caller::Guest { instance, .. } =
3201                &store.concurrent_state_mut().get_mut(id)?.caller
3202            {
3203                (Waitable::Guest(id), *instance)
3204            } else {
3205                unreachable!()
3206            }
3207        };
3208        // Since waitables can neither be passed between instances nor forged,
3209        // this should never fail unless there's a bug in Wasmtime, but we check
3210        // here to be sure:
3211        assert_eq!(expected_caller_instance, caller_instance);
3212
3213        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3214
3215        let concurrent_state = store.concurrent_state_mut();
3216        if let Waitable::Host(host_task) = waitable {
3217            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3218                handle.abort();
3219                return Ok(Status::ReturnCancelled as u32);
3220            }
3221        } else {
3222            let caller = concurrent_state.guest_thread.unwrap();
3223            let guest_task = TableId::<GuestTask>::new(rep);
3224            let task = concurrent_state.get_mut(guest_task)?;
3225            if !task.already_lowered_parameters() {
3226                task.lower_params = None;
3227                task.lift_result = None;
3228
3229                // Not yet started; cancel and remove from pending
3230                let callee_instance = task.instance;
3231
3232                let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3233                let pending_count = pending.len();
3234                pending.retain(|thread, _| thread.task != guest_task);
3235                // If there were no pending threads for this task, we're in an error state
3236                if pending.len() == pending_count {
3237                    bail!("`subtask.cancel` called after terminal status delivered");
3238                }
3239                return Ok(Status::StartCancelled as u32);
3240            } else if !task.returned_or_cancelled() {
3241                // Started, but not yet returned or cancelled; send the
3242                // `CANCELLED` event
3243                task.cancel_sent = true;
3244                // Note that this might overwrite an event that was set earlier
3245                // (e.g. `Event::None` if the task is yielding, or
3246                // `Event::Cancelled` if it was already cancelled), but that's
3247                // okay -- this should supersede the previous state.
3248                task.event = Some(Event::Cancelled);
3249                for thread in task.threads.clone() {
3250                    let thread = QualifiedThreadId {
3251                        task: guest_task,
3252                        thread,
3253                    };
3254                    if let Some(set) = concurrent_state
3255                        .get_mut(thread.thread)
3256                        .unwrap()
3257                        .wake_on_cancel
3258                        .take()
3259                    {
3260                        let item = match concurrent_state
3261                            .get_mut(set)?
3262                            .waiting
3263                            .remove(&thread)
3264                            .unwrap()
3265                        {
3266                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3267                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3268                                thread,
3269                                kind: GuestCallKind::DeliverEvent {
3270                                    instance,
3271                                    set: None,
3272                                },
3273                            }),
3274                        };
3275                        concurrent_state.push_high_priority(item);
3276
3277                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3278                        break;
3279                    }
3280                }
3281
3282                let concurrent_state = store.concurrent_state_mut();
3283                let task = concurrent_state.get_mut(guest_task)?;
3284                if !task.returned_or_cancelled() {
3285                    if async_ {
3286                        return Ok(BLOCKED);
3287                    } else {
3288                        store.wait_for_event(Waitable::Guest(guest_task))?;
3289                    }
3290                }
3291            }
3292        }
3293
3294        let event = waitable.take_event(store.concurrent_state_mut())?;
3295        if let Some(Event::Subtask {
3296            status: status @ (Status::Returned | Status::ReturnCancelled),
3297        }) = event
3298        {
3299            Ok(status as u32)
3300        } else {
3301            bail!("`subtask.cancel` called after terminal status delivered");
3302        }
3303    }
3304
3305    pub(crate) fn context_get(
3306        self,
3307        store: &mut StoreOpaque,
3308        caller: RuntimeComponentInstanceIndex,
3309        slot: u32,
3310    ) -> Result<u32> {
3311        self.id().get(store).check_may_leave(caller)?;
3312        store.concurrent_state_mut().context_get(slot)
3313    }
3314
3315    pub(crate) fn context_set(
3316        self,
3317        store: &mut StoreOpaque,
3318        caller: RuntimeComponentInstanceIndex,
3319        slot: u32,
3320        value: u32,
3321    ) -> Result<()> {
3322        self.id().get(store).check_may_leave(caller)?;
3323        store.concurrent_state_mut().context_set(slot, value)
3324    }
3325}
3326
3327/// Trait representing component model ABI async intrinsics and fused adapter
3328/// helper functions.
3329///
3330/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3331/// which must be valid for at least the duration of the call (and possibly for
3332/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3333/// pointers used for async calls).
3334pub trait VMComponentAsyncStore {
3335    /// A helper function for fused adapter modules involving calls where the
3336    /// one of the caller or callee is async.
3337    ///
3338    /// This helper is not used when the caller and callee both use the sync
3339    /// ABI, only when at least one is async is this used.
3340    unsafe fn prepare_call(
3341        &mut self,
3342        instance: Instance,
3343        memory: *mut VMMemoryDefinition,
3344        start: *mut VMFuncRef,
3345        return_: *mut VMFuncRef,
3346        caller_instance: RuntimeComponentInstanceIndex,
3347        callee_instance: RuntimeComponentInstanceIndex,
3348        task_return_type: TypeTupleIndex,
3349        string_encoding: u8,
3350        result_count: u32,
3351        storage: *mut ValRaw,
3352        storage_len: usize,
3353    ) -> Result<()>;
3354
3355    /// A helper function for fused adapter modules involving calls where the
3356    /// caller is sync-lowered but the callee is async-lifted.
3357    unsafe fn sync_start(
3358        &mut self,
3359        instance: Instance,
3360        callback: *mut VMFuncRef,
3361        callee: *mut VMFuncRef,
3362        param_count: u32,
3363        storage: *mut MaybeUninit<ValRaw>,
3364        storage_len: usize,
3365    ) -> Result<()>;
3366
3367    /// A helper function for fused adapter modules involving calls where the
3368    /// caller is async-lowered.
3369    unsafe fn async_start(
3370        &mut self,
3371        instance: Instance,
3372        callback: *mut VMFuncRef,
3373        post_return: *mut VMFuncRef,
3374        callee: *mut VMFuncRef,
3375        param_count: u32,
3376        result_count: u32,
3377        flags: u32,
3378    ) -> Result<u32>;
3379
3380    /// The `future.write` intrinsic.
3381    fn future_write(
3382        &mut self,
3383        instance: Instance,
3384        caller: RuntimeComponentInstanceIndex,
3385        ty: TypeFutureTableIndex,
3386        options: OptionsIndex,
3387        future: u32,
3388        address: u32,
3389    ) -> Result<u32>;
3390
3391    /// The `future.read` intrinsic.
3392    fn future_read(
3393        &mut self,
3394        instance: Instance,
3395        caller: RuntimeComponentInstanceIndex,
3396        ty: TypeFutureTableIndex,
3397        options: OptionsIndex,
3398        future: u32,
3399        address: u32,
3400    ) -> Result<u32>;
3401
3402    /// The `future.drop-writable` intrinsic.
3403    fn future_drop_writable(
3404        &mut self,
3405        instance: Instance,
3406        caller: RuntimeComponentInstanceIndex,
3407        ty: TypeFutureTableIndex,
3408        writer: u32,
3409    ) -> Result<()>;
3410
3411    /// The `stream.write` intrinsic.
3412    fn stream_write(
3413        &mut self,
3414        instance: Instance,
3415        caller: RuntimeComponentInstanceIndex,
3416        ty: TypeStreamTableIndex,
3417        options: OptionsIndex,
3418        stream: u32,
3419        address: u32,
3420        count: u32,
3421    ) -> Result<u32>;
3422
3423    /// The `stream.read` intrinsic.
3424    fn stream_read(
3425        &mut self,
3426        instance: Instance,
3427        caller: RuntimeComponentInstanceIndex,
3428        ty: TypeStreamTableIndex,
3429        options: OptionsIndex,
3430        stream: u32,
3431        address: u32,
3432        count: u32,
3433    ) -> Result<u32>;
3434
3435    /// The "fast-path" implementation of the `stream.write` intrinsic for
3436    /// "flat" (i.e. memcpy-able) payloads.
3437    fn flat_stream_write(
3438        &mut self,
3439        instance: Instance,
3440        caller: RuntimeComponentInstanceIndex,
3441        ty: TypeStreamTableIndex,
3442        options: OptionsIndex,
3443        payload_size: u32,
3444        payload_align: u32,
3445        stream: u32,
3446        address: u32,
3447        count: u32,
3448    ) -> Result<u32>;
3449
3450    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3451    /// (i.e. memcpy-able) payloads.
3452    fn flat_stream_read(
3453        &mut self,
3454        instance: Instance,
3455        caller: RuntimeComponentInstanceIndex,
3456        ty: TypeStreamTableIndex,
3457        options: OptionsIndex,
3458        payload_size: u32,
3459        payload_align: u32,
3460        stream: u32,
3461        address: u32,
3462        count: u32,
3463    ) -> Result<u32>;
3464
3465    /// The `stream.drop-writable` intrinsic.
3466    fn stream_drop_writable(
3467        &mut self,
3468        instance: Instance,
3469        caller: RuntimeComponentInstanceIndex,
3470        ty: TypeStreamTableIndex,
3471        writer: u32,
3472    ) -> Result<()>;
3473
3474    /// The `error-context.debug-message` intrinsic.
3475    fn error_context_debug_message(
3476        &mut self,
3477        instance: Instance,
3478        caller: RuntimeComponentInstanceIndex,
3479        ty: TypeComponentLocalErrorContextTableIndex,
3480        options: OptionsIndex,
3481        err_ctx_handle: u32,
3482        debug_msg_address: u32,
3483    ) -> Result<()>;
3484
3485    /// The `thread.new-indirect` intrinsic
3486    fn thread_new_indirect(
3487        &mut self,
3488        instance: Instance,
3489        caller: RuntimeComponentInstanceIndex,
3490        func_ty_idx: TypeFuncIndex,
3491        start_func_table_idx: RuntimeTableIndex,
3492        start_func_idx: u32,
3493        context: i32,
3494    ) -> Result<u32>;
3495}
3496
3497/// SAFETY: See trait docs.
3498impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3499    unsafe fn prepare_call(
3500        &mut self,
3501        instance: Instance,
3502        memory: *mut VMMemoryDefinition,
3503        start: *mut VMFuncRef,
3504        return_: *mut VMFuncRef,
3505        caller_instance: RuntimeComponentInstanceIndex,
3506        callee_instance: RuntimeComponentInstanceIndex,
3507        task_return_type: TypeTupleIndex,
3508        string_encoding: u8,
3509        result_count_or_max_if_async: u32,
3510        storage: *mut ValRaw,
3511        storage_len: usize,
3512    ) -> Result<()> {
3513        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3514        // this method will have ensured that `storage` is a valid
3515        // pointer containing at least `storage_len` items.
3516        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3517
3518        unsafe {
3519            instance.prepare_call(
3520                StoreContextMut(self),
3521                start,
3522                return_,
3523                caller_instance,
3524                callee_instance,
3525                task_return_type,
3526                memory,
3527                string_encoding,
3528                match result_count_or_max_if_async {
3529                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3530                        params,
3531                        has_result: false,
3532                    },
3533                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3534                        params,
3535                        has_result: true,
3536                    },
3537                    result_count => CallerInfo::Sync {
3538                        params,
3539                        result_count,
3540                    },
3541                },
3542            )
3543        }
3544    }
3545
3546    unsafe fn sync_start(
3547        &mut self,
3548        instance: Instance,
3549        callback: *mut VMFuncRef,
3550        callee: *mut VMFuncRef,
3551        param_count: u32,
3552        storage: *mut MaybeUninit<ValRaw>,
3553        storage_len: usize,
3554    ) -> Result<()> {
3555        unsafe {
3556            instance
3557                .start_call(
3558                    StoreContextMut(self),
3559                    callback,
3560                    ptr::null_mut(),
3561                    callee,
3562                    param_count,
3563                    1,
3564                    START_FLAG_ASYNC_CALLEE,
3565                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3566                    // this method will have ensured that `storage` is a valid
3567                    // pointer containing at least `storage_len` items.
3568                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3569                )
3570                .map(drop)
3571        }
3572    }
3573
3574    unsafe fn async_start(
3575        &mut self,
3576        instance: Instance,
3577        callback: *mut VMFuncRef,
3578        post_return: *mut VMFuncRef,
3579        callee: *mut VMFuncRef,
3580        param_count: u32,
3581        result_count: u32,
3582        flags: u32,
3583    ) -> Result<u32> {
3584        unsafe {
3585            instance.start_call(
3586                StoreContextMut(self),
3587                callback,
3588                post_return,
3589                callee,
3590                param_count,
3591                result_count,
3592                flags,
3593                None,
3594            )
3595        }
3596    }
3597
3598    fn future_write(
3599        &mut self,
3600        instance: Instance,
3601        caller: RuntimeComponentInstanceIndex,
3602        ty: TypeFutureTableIndex,
3603        options: OptionsIndex,
3604        future: u32,
3605        address: u32,
3606    ) -> Result<u32> {
3607        instance.id().get(self).check_may_leave(caller)?;
3608        instance
3609            .guest_write(
3610                StoreContextMut(self),
3611                caller,
3612                TransmitIndex::Future(ty),
3613                options,
3614                None,
3615                future,
3616                address,
3617                1,
3618            )
3619            .map(|result| result.encode())
3620    }
3621
3622    fn future_read(
3623        &mut self,
3624        instance: Instance,
3625        caller: RuntimeComponentInstanceIndex,
3626        ty: TypeFutureTableIndex,
3627        options: OptionsIndex,
3628        future: u32,
3629        address: u32,
3630    ) -> Result<u32> {
3631        instance.id().get(self).check_may_leave(caller)?;
3632        instance
3633            .guest_read(
3634                StoreContextMut(self),
3635                caller,
3636                TransmitIndex::Future(ty),
3637                options,
3638                None,
3639                future,
3640                address,
3641                1,
3642            )
3643            .map(|result| result.encode())
3644    }
3645
3646    fn stream_write(
3647        &mut self,
3648        instance: Instance,
3649        caller: RuntimeComponentInstanceIndex,
3650        ty: TypeStreamTableIndex,
3651        options: OptionsIndex,
3652        stream: u32,
3653        address: u32,
3654        count: u32,
3655    ) -> Result<u32> {
3656        instance.id().get(self).check_may_leave(caller)?;
3657        instance
3658            .guest_write(
3659                StoreContextMut(self),
3660                caller,
3661                TransmitIndex::Stream(ty),
3662                options,
3663                None,
3664                stream,
3665                address,
3666                count,
3667            )
3668            .map(|result| result.encode())
3669    }
3670
3671    fn stream_read(
3672        &mut self,
3673        instance: Instance,
3674        caller: RuntimeComponentInstanceIndex,
3675        ty: TypeStreamTableIndex,
3676        options: OptionsIndex,
3677        stream: u32,
3678        address: u32,
3679        count: u32,
3680    ) -> Result<u32> {
3681        instance.id().get(self).check_may_leave(caller)?;
3682        instance
3683            .guest_read(
3684                StoreContextMut(self),
3685                caller,
3686                TransmitIndex::Stream(ty),
3687                options,
3688                None,
3689                stream,
3690                address,
3691                count,
3692            )
3693            .map(|result| result.encode())
3694    }
3695
3696    fn future_drop_writable(
3697        &mut self,
3698        instance: Instance,
3699        caller: RuntimeComponentInstanceIndex,
3700        ty: TypeFutureTableIndex,
3701        writer: u32,
3702    ) -> Result<()> {
3703        instance.id().get(self).check_may_leave(caller)?;
3704        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3705    }
3706
3707    fn flat_stream_write(
3708        &mut self,
3709        instance: Instance,
3710        caller: RuntimeComponentInstanceIndex,
3711        ty: TypeStreamTableIndex,
3712        options: OptionsIndex,
3713        payload_size: u32,
3714        payload_align: u32,
3715        stream: u32,
3716        address: u32,
3717        count: u32,
3718    ) -> Result<u32> {
3719        instance.id().get(self).check_may_leave(caller)?;
3720        instance
3721            .guest_write(
3722                StoreContextMut(self),
3723                caller,
3724                TransmitIndex::Stream(ty),
3725                options,
3726                Some(FlatAbi {
3727                    size: payload_size,
3728                    align: payload_align,
3729                }),
3730                stream,
3731                address,
3732                count,
3733            )
3734            .map(|result| result.encode())
3735    }
3736
3737    fn flat_stream_read(
3738        &mut self,
3739        instance: Instance,
3740        caller: RuntimeComponentInstanceIndex,
3741        ty: TypeStreamTableIndex,
3742        options: OptionsIndex,
3743        payload_size: u32,
3744        payload_align: u32,
3745        stream: u32,
3746        address: u32,
3747        count: u32,
3748    ) -> Result<u32> {
3749        instance.id().get(self).check_may_leave(caller)?;
3750        instance
3751            .guest_read(
3752                StoreContextMut(self),
3753                caller,
3754                TransmitIndex::Stream(ty),
3755                options,
3756                Some(FlatAbi {
3757                    size: payload_size,
3758                    align: payload_align,
3759                }),
3760                stream,
3761                address,
3762                count,
3763            )
3764            .map(|result| result.encode())
3765    }
3766
3767    fn stream_drop_writable(
3768        &mut self,
3769        instance: Instance,
3770        caller: RuntimeComponentInstanceIndex,
3771        ty: TypeStreamTableIndex,
3772        writer: u32,
3773    ) -> Result<()> {
3774        instance.id().get(self).check_may_leave(caller)?;
3775        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3776    }
3777
3778    fn error_context_debug_message(
3779        &mut self,
3780        instance: Instance,
3781        caller: RuntimeComponentInstanceIndex,
3782        ty: TypeComponentLocalErrorContextTableIndex,
3783        options: OptionsIndex,
3784        err_ctx_handle: u32,
3785        debug_msg_address: u32,
3786    ) -> Result<()> {
3787        instance.id().get(self).check_may_leave(caller)?;
3788        instance.error_context_debug_message(
3789            StoreContextMut(self),
3790            ty,
3791            options,
3792            err_ctx_handle,
3793            debug_msg_address,
3794        )
3795    }
3796
3797    fn thread_new_indirect(
3798        &mut self,
3799        instance: Instance,
3800        caller: RuntimeComponentInstanceIndex,
3801        func_ty_idx: TypeFuncIndex,
3802        start_func_table_idx: RuntimeTableIndex,
3803        start_func_idx: u32,
3804        context: i32,
3805    ) -> Result<u32> {
3806        instance.thread_new_indirect(
3807            StoreContextMut(self),
3808            caller,
3809            func_ty_idx,
3810            start_func_table_idx,
3811            start_func_idx,
3812            context,
3813        )
3814    }
3815}
3816
3817type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3818
3819/// Represents the state of a pending host task.
3820struct HostTask {
3821    common: WaitableCommon,
3822    caller_instance: RuntimeComponentInstanceIndex,
3823    join_handle: Option<JoinHandle>,
3824}
3825
3826impl HostTask {
3827    fn new(
3828        caller_instance: RuntimeComponentInstanceIndex,
3829        join_handle: Option<JoinHandle>,
3830    ) -> Self {
3831        Self {
3832            common: WaitableCommon::default(),
3833            caller_instance,
3834            join_handle,
3835        }
3836    }
3837}
3838
3839impl TableDebug for HostTask {
3840    fn type_name() -> &'static str {
3841        "HostTask"
3842    }
3843}
3844
3845type CallbackFn = Box<
3846    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3847        + Send
3848        + Sync
3849        + 'static,
3850>;
3851
3852/// Represents the caller of a given guest task.
3853enum Caller {
3854    /// The host called the guest task.
3855    Host {
3856        /// If present, may be used to deliver the result.
3857        tx: Option<oneshot::Sender<LiftedResult>>,
3858        /// Channel to notify once all subtasks spawned by this caller have
3859        /// completed.
3860        ///
3861        /// Note that we'll never actually send anything to this channel;
3862        /// dropping it when the refcount goes to zero is sufficient to notify
3863        /// the receiver.
3864        exit_tx: Arc<oneshot::Sender<()>>,
3865        /// If true, there's a host future that must be dropped before the task
3866        /// can be deleted.
3867        host_future_present: bool,
3868        /// If true, call `post-return` function (if any) automatically.
3869        call_post_return_automatically: bool,
3870    },
3871    /// Another guest thread called the guest task
3872    Guest {
3873        /// The id of the caller
3874        thread: QualifiedThreadId,
3875        /// The instance to use to enforce reentrance rules.
3876        ///
3877        /// Note that this might not be the same as the instance the caller task
3878        /// started executing in given that one or more synchronous guest->guest
3879        /// calls may have occurred involving multiple instances.
3880        instance: RuntimeComponentInstanceIndex,
3881    },
3882}
3883
3884/// Represents a closure and related canonical ABI parameters required to
3885/// validate a `task.return` call at runtime and lift the result.
3886struct LiftResult {
3887    lift: RawLift,
3888    ty: TypeTupleIndex,
3889    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3890    string_encoding: StringEncoding,
3891}
3892
3893/// The table ID for a guest thread, qualified by the task to which it belongs.
3894///
3895/// This exists to minimize table lookups and the necessity to pass stores around mutably
3896/// for the common case of identifying the task to which a thread belongs.
3897#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3898struct QualifiedThreadId {
3899    task: TableId<GuestTask>,
3900    thread: TableId<GuestThread>,
3901}
3902
3903impl QualifiedThreadId {
3904    fn qualify(
3905        state: &mut ConcurrentState,
3906        thread: TableId<GuestThread>,
3907    ) -> Result<QualifiedThreadId> {
3908        Ok(QualifiedThreadId {
3909            task: state.get_mut(thread)?.parent_task,
3910            thread,
3911        })
3912    }
3913}
3914
3915impl fmt::Debug for QualifiedThreadId {
3916    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3917        f.debug_tuple("QualifiedThreadId")
3918            .field(&self.task.rep())
3919            .field(&self.thread.rep())
3920            .finish()
3921    }
3922}
3923
3924enum GuestThreadState {
3925    NotStartedImplicit,
3926    NotStartedExplicit(
3927        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3928    ),
3929    Running,
3930    Suspended(StoreFiber<'static>),
3931    Pending,
3932    Completed,
3933}
3934pub struct GuestThread {
3935    /// Context-local state used to implement the `context.{get,set}`
3936    /// intrinsics.
3937    context: [u32; 2],
3938    /// The owning guest task.
3939    parent_task: TableId<GuestTask>,
3940    /// If present, indicates that the thread is currently waiting on the
3941    /// specified set but may be cancelled and woken immediately.
3942    wake_on_cancel: Option<TableId<WaitableSet>>,
3943    /// The execution state of this guest thread
3944    state: GuestThreadState,
3945    /// The index of this thread in the component instance's handle table.
3946    /// This must always be `Some` after initialization.
3947    instance_rep: Option<u32>,
3948}
3949
3950impl GuestThread {
3951    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
3952    /// handle.
3953    fn from_instance(
3954        state: Pin<&mut ComponentInstance>,
3955        caller_instance: RuntimeComponentInstanceIndex,
3956        guest_thread: u32,
3957    ) -> Result<TableId<Self>> {
3958        let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3959        Ok(TableId::new(rep))
3960    }
3961
3962    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3963        Self {
3964            context: [0; 2],
3965            parent_task,
3966            wake_on_cancel: None,
3967            state: GuestThreadState::NotStartedImplicit,
3968            instance_rep: None,
3969        }
3970    }
3971
3972    fn new_explicit(
3973        parent_task: TableId<GuestTask>,
3974        start_func: Box<
3975            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3976        >,
3977    ) -> Self {
3978        Self {
3979            context: [0; 2],
3980            parent_task,
3981            wake_on_cancel: None,
3982            state: GuestThreadState::NotStartedExplicit(start_func),
3983            instance_rep: None,
3984        }
3985    }
3986}
3987
3988impl TableDebug for GuestThread {
3989    fn type_name() -> &'static str {
3990        "GuestThread"
3991    }
3992}
3993
3994enum SyncResult {
3995    NotProduced,
3996    Produced(Option<ValRaw>),
3997    Taken,
3998}
3999
4000impl SyncResult {
4001    fn take(&mut self) -> Option<Option<ValRaw>> {
4002        match mem::replace(self, SyncResult::Taken) {
4003            SyncResult::NotProduced => None,
4004            SyncResult::Produced(val) => Some(val),
4005            SyncResult::Taken => {
4006                panic!("attempted to take a synchronous result that was already taken")
4007            }
4008        }
4009    }
4010}
4011
4012#[derive(Debug)]
4013enum HostFutureState {
4014    NotApplicable,
4015    Live,
4016    Dropped,
4017}
4018
4019/// Represents a pending guest task.
4020pub(crate) struct GuestTask {
4021    /// See `WaitableCommon`
4022    common: WaitableCommon,
4023    /// Closure to lower the parameters passed to this task.
4024    lower_params: Option<RawLower>,
4025    /// See `LiftResult`
4026    lift_result: Option<LiftResult>,
4027    /// A place to stash the type-erased lifted result if it can't be delivered
4028    /// immediately.
4029    result: Option<LiftedResult>,
4030    /// Closure to call the callback function for an async-lifted export, if
4031    /// provided.
4032    callback: Option<CallbackFn>,
4033    /// See `Caller`
4034    caller: Caller,
4035    /// A place to stash the call context for managing resource borrows while
4036    /// switching between guest tasks.
4037    call_context: Option<CallContext>,
4038    /// A place to stash the lowered result for a sync-to-async call until it
4039    /// can be returned to the caller.
4040    sync_result: SyncResult,
4041    /// Whether or not the task has been cancelled (i.e. whether the task is
4042    /// permitted to call `task.cancel`).
4043    cancel_sent: bool,
4044    /// Whether or not we've sent a `Status::Starting` event to any current or
4045    /// future waiters for this waitable.
4046    starting_sent: bool,
4047    /// Pending guest subtasks created by this task (directly or indirectly).
4048    ///
4049    /// This is used to re-parent subtasks which are still running when their
4050    /// parent task is disposed.
4051    subtasks: HashSet<TableId<GuestTask>>,
4052    /// Scratch waitable set used to watch subtasks during synchronous calls.
4053    sync_call_set: TableId<WaitableSet>,
4054    /// The instance to which the exported function for this guest task belongs.
4055    ///
4056    /// Note that the task may do a sync->sync call via a fused adapter which
4057    /// results in that task executing code in a different instance, and it may
4058    /// call host functions and intrinsics from that other instance.
4059    instance: RuntimeComponentInstanceIndex,
4060    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4061    /// delivered to this task.
4062    event: Option<Event>,
4063    /// The `ExportIndex` of the guest function being called, if known.
4064    function_index: Option<ExportIndex>,
4065    /// Whether or not the task has exited.
4066    exited: bool,
4067    /// Threads belonging to this task
4068    threads: HashSet<TableId<GuestThread>>,
4069    /// The state of the host future that represents an async task, which must
4070    /// be dropped before we can delete the task.
4071    host_future_state: HostFutureState,
4072}
4073
4074impl GuestTask {
4075    fn already_lowered_parameters(&self) -> bool {
4076        // We reset `lower_params` after we lower the parameters
4077        self.lower_params.is_none()
4078    }
4079    fn returned_or_cancelled(&self) -> bool {
4080        // We reset `lift_result` after we return or exit
4081        self.lift_result.is_none()
4082    }
4083    fn ready_to_delete(&self) -> bool {
4084        let threads_completed = self.threads.is_empty();
4085        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4086        let pending_completion_event = matches!(
4087            self.common.event,
4088            Some(Event::Subtask {
4089                status: Status::Returned | Status::ReturnCancelled
4090            })
4091        );
4092        let ready = threads_completed
4093            && !has_sync_result
4094            && !pending_completion_event
4095            && !matches!(self.host_future_state, HostFutureState::Live);
4096        log::trace!(
4097            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4098            threads_completed,
4099            has_sync_result,
4100            pending_completion_event,
4101            self.host_future_state
4102        );
4103        ready
4104    }
4105    fn new(
4106        state: &mut ConcurrentState,
4107        lower_params: RawLower,
4108        lift_result: LiftResult,
4109        caller: Caller,
4110        callback: Option<CallbackFn>,
4111        component_instance: RuntimeComponentInstanceIndex,
4112    ) -> Result<Self> {
4113        let sync_call_set = state.push(WaitableSet::default())?;
4114        let host_future_state = match &caller {
4115            Caller::Guest { .. } => HostFutureState::NotApplicable,
4116            Caller::Host {
4117                host_future_present,
4118                ..
4119            } => {
4120                if *host_future_present {
4121                    HostFutureState::Live
4122                } else {
4123                    HostFutureState::NotApplicable
4124                }
4125            }
4126        };
4127        Ok(Self {
4128            common: WaitableCommon::default(),
4129            lower_params: Some(lower_params),
4130            lift_result: Some(lift_result),
4131            result: None,
4132            callback,
4133            caller,
4134            call_context: Some(CallContext::default()),
4135            sync_result: SyncResult::NotProduced,
4136            cancel_sent: false,
4137            starting_sent: false,
4138            subtasks: HashSet::new(),
4139            sync_call_set,
4140            instance: component_instance,
4141            event: None,
4142            function_index: None,
4143            exited: false,
4144            threads: HashSet::new(),
4145            host_future_state,
4146        })
4147    }
4148
4149    /// Dispose of this guest task, reparenting any pending subtasks to the
4150    /// caller.
4151    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4152        // If there are not-yet-delivered completion events for subtasks in
4153        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4154        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4155            if let Some(Event::Subtask {
4156                status: Status::Returned | Status::ReturnCancelled,
4157            }) = waitable.common(state)?.event
4158            {
4159                waitable.delete_from(state)?;
4160            }
4161        }
4162
4163        assert!(self.threads.is_empty());
4164
4165        state.delete(self.sync_call_set)?;
4166
4167        // Reparent any pending subtasks to the caller.
4168        match &self.caller {
4169            Caller::Guest {
4170                thread,
4171                instance: runtime_instance,
4172            } => {
4173                let task_mut = state.get_mut(thread.task)?;
4174                let present = task_mut.subtasks.remove(&me);
4175                assert!(present);
4176
4177                for subtask in &self.subtasks {
4178                    task_mut.subtasks.insert(*subtask);
4179                }
4180
4181                for subtask in &self.subtasks {
4182                    state.get_mut(*subtask)?.caller = Caller::Guest {
4183                        thread: *thread,
4184                        instance: *runtime_instance,
4185                    };
4186                }
4187            }
4188            Caller::Host { exit_tx, .. } => {
4189                for subtask in &self.subtasks {
4190                    state.get_mut(*subtask)?.caller = Caller::Host {
4191                        tx: None,
4192                        // Clone `exit_tx` to ensure that it is only dropped
4193                        // once all transitive subtasks of the host call have
4194                        // exited:
4195                        exit_tx: exit_tx.clone(),
4196                        host_future_present: false,
4197                        call_post_return_automatically: true,
4198                    };
4199                }
4200            }
4201        }
4202
4203        for subtask in self.subtasks {
4204            if state.get_mut(subtask)?.exited {
4205                Waitable::Guest(subtask).delete_from(state)?;
4206            }
4207        }
4208
4209        Ok(())
4210    }
4211
4212    fn call_post_return_automatically(&self) -> bool {
4213        matches!(
4214            self.caller,
4215            Caller::Guest { .. }
4216                | Caller::Host {
4217                    call_post_return_automatically: true,
4218                    ..
4219                }
4220        )
4221    }
4222}
4223
4224impl TableDebug for GuestTask {
4225    fn type_name() -> &'static str {
4226        "GuestTask"
4227    }
4228}
4229
4230/// Represents state common to all kinds of waitables.
4231#[derive(Default)]
4232struct WaitableCommon {
4233    /// The currently pending event for this waitable, if any.
4234    event: Option<Event>,
4235    /// The set to which this waitable belongs, if any.
4236    set: Option<TableId<WaitableSet>>,
4237    /// The handle with which the guest refers to this waitable, if any.
4238    handle: Option<u32>,
4239}
4240
4241/// Represents a Component Model Async `waitable`.
4242#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4243enum Waitable {
4244    /// A host task
4245    Host(TableId<HostTask>),
4246    /// A guest task
4247    Guest(TableId<GuestTask>),
4248    /// The read or write end of a stream or future
4249    Transmit(TableId<TransmitHandle>),
4250}
4251
4252impl Waitable {
4253    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4254    /// handle.
4255    fn from_instance(
4256        state: Pin<&mut ComponentInstance>,
4257        caller_instance: RuntimeComponentInstanceIndex,
4258        waitable: u32,
4259    ) -> Result<Self> {
4260        use crate::runtime::vm::component::Waitable;
4261
4262        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4263
4264        Ok(match kind {
4265            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4266            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4267            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4268        })
4269    }
4270
4271    /// Retrieve the host-visible identifier for this `Waitable`.
4272    fn rep(&self) -> u32 {
4273        match self {
4274            Self::Host(id) => id.rep(),
4275            Self::Guest(id) => id.rep(),
4276            Self::Transmit(id) => id.rep(),
4277        }
4278    }
4279
4280    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4281    /// remove it from any set it may currently belong to (when `set` is
4282    /// `None`).
4283    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4284        log::trace!("waitable {self:?} join set {set:?}",);
4285
4286        let old = mem::replace(&mut self.common(state)?.set, set);
4287
4288        if let Some(old) = old {
4289            match *self {
4290                Waitable::Host(id) => state.remove_child(id, old),
4291                Waitable::Guest(id) => state.remove_child(id, old),
4292                Waitable::Transmit(id) => state.remove_child(id, old),
4293            }?;
4294
4295            state.get_mut(old)?.ready.remove(self);
4296        }
4297
4298        if let Some(set) = set {
4299            match *self {
4300                Waitable::Host(id) => state.add_child(id, set),
4301                Waitable::Guest(id) => state.add_child(id, set),
4302                Waitable::Transmit(id) => state.add_child(id, set),
4303            }?;
4304
4305            if self.common(state)?.event.is_some() {
4306                self.mark_ready(state)?;
4307            }
4308        }
4309
4310        Ok(())
4311    }
4312
4313    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4314    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4315        Ok(match self {
4316            Self::Host(id) => &mut state.get_mut(*id)?.common,
4317            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4318            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4319        })
4320    }
4321
4322    /// Set or clear the pending event for this waitable and either deliver it
4323    /// to the first waiter, if any, or mark it as ready to be delivered to the
4324    /// next waiter that arrives.
4325    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4326        log::trace!("set event for {self:?}: {event:?}");
4327        self.common(state)?.event = event;
4328        self.mark_ready(state)
4329    }
4330
4331    /// Take the pending event from this waitable, leaving `None` in its place.
4332    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4333        let common = self.common(state)?;
4334        let event = common.event.take();
4335        if let Some(set) = self.common(state)?.set {
4336            state.get_mut(set)?.ready.remove(self);
4337        }
4338
4339        Ok(event)
4340    }
4341
4342    /// Deliver the current event for this waitable to the first waiter, if any,
4343    /// or else mark it as ready to be delivered to the next waiter that
4344    /// arrives.
4345    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4346        if let Some(set) = self.common(state)?.set {
4347            state.get_mut(set)?.ready.insert(*self);
4348            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4349                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4350                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4351
4352                let item = match mode {
4353                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4354                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4355                        thread,
4356                        kind: GuestCallKind::DeliverEvent {
4357                            instance,
4358                            set: Some(set),
4359                        },
4360                    }),
4361                };
4362                state.push_high_priority(item);
4363            }
4364        }
4365        Ok(())
4366    }
4367
4368    /// Remove this waitable from the instance's rep table.
4369    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4370        match self {
4371            Self::Host(task) => {
4372                log::trace!("delete host task {task:?}");
4373                state.delete(*task)?;
4374            }
4375            Self::Guest(task) => {
4376                log::trace!("delete guest task {task:?}");
4377                state.delete(*task)?.dispose(state, *task)?;
4378            }
4379            Self::Transmit(task) => {
4380                state.delete(*task)?;
4381            }
4382        }
4383
4384        Ok(())
4385    }
4386}
4387
4388impl fmt::Debug for Waitable {
4389    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4390        match self {
4391            Self::Host(id) => write!(f, "{id:?}"),
4392            Self::Guest(id) => write!(f, "{id:?}"),
4393            Self::Transmit(id) => write!(f, "{id:?}"),
4394        }
4395    }
4396}
4397
4398/// Represents a Component Model Async `waitable-set`.
4399#[derive(Default)]
4400struct WaitableSet {
4401    /// Which waitables in this set have pending events, if any.
4402    ready: BTreeSet<Waitable>,
4403    /// Which guest threads are currently waiting on this set, if any.
4404    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4405}
4406
4407impl TableDebug for WaitableSet {
4408    fn type_name() -> &'static str {
4409        "WaitableSet"
4410    }
4411}
4412
4413/// Type-erased closure to lower the parameters for a guest task.
4414type RawLower =
4415    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4416
4417/// Type-erased closure to lift the result for a guest task.
4418type RawLift = Box<
4419    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4420>;
4421
4422/// Type erased result of a guest task which may be downcast to the expected
4423/// type by a host caller (or simply ignored in the case of a guest caller; see
4424/// `DummyResult`).
4425type LiftedResult = Box<dyn Any + Send + Sync>;
4426
4427/// Used to return a result from a `LiftFn` when the actual result has already
4428/// been lowered to a guest task's stack and linear memory.
4429struct DummyResult;
4430
4431/// Represents the Component Model Async state of a (sub-)component instance.
4432#[derive(Default)]
4433struct InstanceState {
4434    /// Whether backpressure is set for this instance (enabled if >0)
4435    backpressure: u16,
4436    /// Whether this instance can be entered
4437    do_not_enter: bool,
4438    /// Pending calls for this instance which require `Self::backpressure` to be
4439    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4440    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4441}
4442
4443/// Represents the Component Model Async state of a store.
4444pub struct ConcurrentState {
4445    /// The currently running guest thread, if any.
4446    guest_thread: Option<QualifiedThreadId>,
4447
4448    /// The set of pending host and background tasks, if any.
4449    ///
4450    /// See `ComponentInstance::poll_until` for where we temporarily take this
4451    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4452    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4453    /// The table of waitables, waitable sets, etc.
4454    table: AlwaysMut<ResourceTable>,
4455    /// Per (sub-)component instance states.
4456    ///
4457    /// See `InstanceState` for details and note that this map is lazily
4458    /// populated as needed.
4459    // TODO: this can and should be a `PrimaryMap`
4460    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4461    /// The "high priority" work queue for this store's event loop.
4462    high_priority: Vec<WorkItem>,
4463    /// The "low priority" work queue for this store's event loop.
4464    low_priority: Vec<WorkItem>,
4465    /// A place to stash the reason a fiber is suspending so that the code which
4466    /// resumed it will know under what conditions the fiber should be resumed
4467    /// again.
4468    suspend_reason: Option<SuspendReason>,
4469    /// A cached fiber which is waiting for work to do.
4470    ///
4471    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4472    worker: Option<StoreFiber<'static>>,
4473    /// A place to stash the work item for which we're resuming a worker fiber.
4474    worker_item: Option<WorkerItem>,
4475
4476    /// Reference counts for all component error contexts
4477    ///
4478    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4479    /// (sub)component ref counts as tracked by `error_context_tables`, for
4480    /// example when the host holds one or more references to error contexts.
4481    ///
4482    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4483    /// component-wide representation) of the index into concurrent state for a given
4484    /// stored `ErrorContext`.
4485    ///
4486    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4487    /// as a `TableId<ErrorContextState>`.
4488    global_error_context_ref_counts:
4489        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4490}
4491
4492impl Default for ConcurrentState {
4493    fn default() -> Self {
4494        Self {
4495            guest_thread: None,
4496            table: AlwaysMut::new(ResourceTable::new()),
4497            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4498            instance_states: HashMap::new(),
4499            high_priority: Vec::new(),
4500            low_priority: Vec::new(),
4501            suspend_reason: None,
4502            worker: None,
4503            worker_item: None,
4504            global_error_context_ref_counts: BTreeMap::new(),
4505        }
4506    }
4507}
4508
4509impl ConcurrentState {
4510    /// Take ownership of any fibers and futures owned by this object.
4511    ///
4512    /// This should be used when disposing of the `Store` containing this object
4513    /// in order to gracefully resolve any and all fibers using
4514    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4515    /// use-after-free bugs due to fibers which may still have access to the
4516    /// `Store`.
4517    ///
4518    /// Additionally, the futures collected with this function should be dropped
4519    /// within a `tls::set` call, which will ensure than any futures closing
4520    /// over an `&Accessor` will have access to the store when dropped, allowing
4521    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4522    /// panicking.
4523    ///
4524    /// Note that this will leave the object in an inconsistent and unusable
4525    /// state, so it should only be used just prior to dropping it.
4526    pub(crate) fn take_fibers_and_futures(
4527        &mut self,
4528        fibers: &mut Vec<StoreFiber<'static>>,
4529        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4530    ) {
4531        for entry in self.table.get_mut().iter_mut() {
4532            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4533                for mode in mem::take(&mut set.waiting).into_values() {
4534                    if let WaitMode::Fiber(fiber) = mode {
4535                        fibers.push(fiber);
4536                    }
4537                }
4538            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4539                if let GuestThreadState::Suspended(fiber) =
4540                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4541                {
4542                    fibers.push(fiber);
4543                }
4544            }
4545        }
4546
4547        if let Some(fiber) = self.worker.take() {
4548            fibers.push(fiber);
4549        }
4550
4551        let mut take_items = |list| {
4552            for item in mem::take(list) {
4553                match item {
4554                    WorkItem::ResumeFiber(fiber) => {
4555                        fibers.push(fiber);
4556                    }
4557                    WorkItem::PushFuture(future) => {
4558                        self.futures
4559                            .get_mut()
4560                            .as_mut()
4561                            .unwrap()
4562                            .push(future.into_inner());
4563                    }
4564                    _ => {}
4565                }
4566            }
4567        };
4568
4569        take_items(&mut self.high_priority);
4570        take_items(&mut self.low_priority);
4571
4572        if let Some(them) = self.futures.get_mut().take() {
4573            futures.push(them);
4574        }
4575    }
4576
4577    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4578        self.instance_states.entry(instance).or_default()
4579    }
4580
4581    fn push<V: Send + Sync + 'static>(
4582        &mut self,
4583        value: V,
4584    ) -> Result<TableId<V>, ResourceTableError> {
4585        self.table.get_mut().push(value).map(TableId::from)
4586    }
4587
4588    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4589        self.table.get_mut().get_mut(&Resource::from(id))
4590    }
4591
4592    pub fn add_child<T: 'static, U: 'static>(
4593        &mut self,
4594        child: TableId<T>,
4595        parent: TableId<U>,
4596    ) -> Result<(), ResourceTableError> {
4597        self.table
4598            .get_mut()
4599            .add_child(Resource::from(child), Resource::from(parent))
4600    }
4601
4602    pub fn remove_child<T: 'static, U: 'static>(
4603        &mut self,
4604        child: TableId<T>,
4605        parent: TableId<U>,
4606    ) -> Result<(), ResourceTableError> {
4607        self.table
4608            .get_mut()
4609            .remove_child(Resource::from(child), Resource::from(parent))
4610    }
4611
4612    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4613        self.table.get_mut().delete(Resource::from(id))
4614    }
4615
4616    fn push_future(&mut self, future: HostTaskFuture) {
4617        // Note that we can't directly push to `ConcurrentState::futures` here
4618        // since this may be called from a future that's being polled inside
4619        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4620        // so it has exclusive access while polling it.  Therefore, we push a
4621        // work item to the "high priority" queue, which will actually push to
4622        // `ConcurrentState::futures` later.
4623        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4624    }
4625
4626    fn push_high_priority(&mut self, item: WorkItem) {
4627        log::trace!("push high priority: {item:?}");
4628        self.high_priority.push(item);
4629    }
4630
4631    fn push_low_priority(&mut self, item: WorkItem) {
4632        log::trace!("push low priority: {item:?}");
4633        self.low_priority.push(item);
4634    }
4635
4636    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4637        if high_priority {
4638            self.push_high_priority(item);
4639        } else {
4640            self.push_low_priority(item);
4641        }
4642    }
4643
4644    /// Determine whether the instance associated with the specified guest task
4645    /// may be entered (i.e. is not already on the async call stack).
4646    ///
4647    /// This is an additional check on top of the "may_enter" instance flag;
4648    /// it's needed because async-lifted exports with callback functions must
4649    /// not call their own instances directly or indirectly, and due to the
4650    /// "stackless" nature of callback-enabled guest tasks this may happen even
4651    /// if there are no activation records on the stack (i.e. the "may_enter"
4652    /// field is `true`) for that instance.
4653    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4654        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4655
4656        // Walk the task tree back to the root, looking for potential
4657        // reentrance.
4658        //
4659        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4660        // such that each bit represents and instance which has been entered by
4661        // that task or an ancestor of that task, in which case this would be a
4662        // constant time check.
4663        loop {
4664            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4665                Caller::Host { .. } => break true,
4666                Caller::Guest { thread, instance } => {
4667                    if *instance == guest_instance {
4668                        break false;
4669                    } else {
4670                        *thread
4671                    }
4672                }
4673            };
4674            guest_task = next_thread.task;
4675        }
4676    }
4677
4678    /// Record that we're about to enter a (sub-)component instance which does
4679    /// not support more than one concurrent, stackful activation, meaning it
4680    /// cannot be entered again until the next call returns.
4681    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4682        self.instance_state(instance).do_not_enter = true;
4683    }
4684
4685    /// Record that we've exited a (sub-)component instance previously entered
4686    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4687    /// See the documentation for the latter for details.
4688    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4689        self.instance_state(instance).do_not_enter = false;
4690        self.partition_pending(instance)
4691    }
4692
4693    /// Iterate over `InstanceState::pending`, moving any ready items into the
4694    /// "high priority" work item queue.
4695    ///
4696    /// See `GuestCall::is_ready` for details.
4697    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4698        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4699            let call = GuestCall { thread, kind };
4700            if call.is_ready(self)? {
4701                self.push_high_priority(WorkItem::GuestCall(call));
4702            } else {
4703                self.instance_state(instance)
4704                    .pending
4705                    .insert(call.thread, call.kind);
4706            }
4707        }
4708
4709        Ok(())
4710    }
4711
4712    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4713    pub(crate) fn backpressure_modify(
4714        &mut self,
4715        caller_instance: RuntimeComponentInstanceIndex,
4716        modify: impl FnOnce(u16) -> Option<u16>,
4717    ) -> Result<()> {
4718        let state = self.instance_state(caller_instance);
4719        let old = state.backpressure;
4720        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4721        state.backpressure = new;
4722
4723        if old > 0 && new == 0 {
4724            // Backpressure was previously enabled and is now disabled; move any
4725            // newly-eligible guest calls to the "high priority" queue.
4726            self.partition_pending(caller_instance)?;
4727        }
4728
4729        Ok(())
4730    }
4731
4732    /// Implements the `context.get` intrinsic.
4733    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4734        let thread = self.guest_thread.unwrap();
4735        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4736        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4737        Ok(val)
4738    }
4739
4740    /// Implements the `context.set` intrinsic.
4741    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4742        let thread = self.guest_thread.unwrap();
4743        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4744        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4745        Ok(())
4746    }
4747
4748    /// Returns whether there's a pending cancellation on the current guest thread,
4749    /// consuming the event if so.
4750    fn take_pending_cancellation(&mut self) -> bool {
4751        let thread = self.guest_thread.unwrap();
4752        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4753            assert!(matches!(event, Event::Cancelled));
4754            true
4755        } else {
4756            false
4757        }
4758    }
4759}
4760
4761/// Provide a type hint to compiler about the shape of a parameter lower
4762/// closure.
4763fn for_any_lower<
4764    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4765>(
4766    fun: F,
4767) -> F {
4768    fun
4769}
4770
4771/// Provide a type hint to compiler about the shape of a result lift closure.
4772fn for_any_lift<
4773    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4774>(
4775    fun: F,
4776) -> F {
4777    fun
4778}
4779
4780/// Wrap the specified future in a `poll_fn` which asserts that the future is
4781/// only polled from the event loop of the specified `Store`.
4782///
4783/// See `StoreContextMut::run_concurrent` for details.
4784fn checked<F: Future + Send + 'static>(
4785    id: StoreId,
4786    fut: F,
4787) -> impl Future<Output = F::Output> + Send + 'static {
4788    async move {
4789        let mut fut = pin!(fut);
4790        future::poll_fn(move |cx| {
4791            let message = "\
4792                `Future`s which depend on asynchronous component tasks, streams, or \
4793                futures to complete may only be polled from the event loop of the \
4794                store to which they belong.  Please use \
4795                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4796            ";
4797            tls::try_get(|store| {
4798                let matched = match store {
4799                    tls::TryGet::Some(store) => store.id() == id,
4800                    tls::TryGet::Taken | tls::TryGet::None => false,
4801                };
4802
4803                if !matched {
4804                    panic!("{message}")
4805                }
4806            });
4807            fut.as_mut().poll(cx)
4808        })
4809        .await
4810    }
4811}
4812
4813/// Assert that `StoreContextMut::run_concurrent` has not been called from
4814/// within an store's event loop.
4815fn check_recursive_run() {
4816    tls::try_get(|store| {
4817        if !matches!(store, tls::TryGet::None) {
4818            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4819        }
4820    });
4821}
4822
4823fn unpack_callback_code(code: u32) -> (u32, u32) {
4824    (code & 0xF, code >> 4)
4825}
4826
4827/// Helper struct for packaging parameters to be passed to
4828/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4829/// `waitable-set.poll`.
4830struct WaitableCheckParams {
4831    set: TableId<WaitableSet>,
4832    options: OptionsIndex,
4833    payload: u32,
4834}
4835
4836/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4837enum WaitableCheck {
4838    Wait(WaitableCheckParams),
4839    Poll(WaitableCheckParams),
4840}
4841
4842/// Represents a guest task called from the host, prepared using `prepare_call`.
4843pub(crate) struct PreparedCall<R> {
4844    /// The guest export to be called
4845    handle: Func,
4846    /// The guest thread created by `prepare_call`
4847    thread: QualifiedThreadId,
4848    /// The number of lowered core Wasm parameters to pass to the call.
4849    param_count: usize,
4850    /// The `oneshot::Receiver` to which the result of the call will be
4851    /// delivered when it is available.
4852    rx: oneshot::Receiver<LiftedResult>,
4853    /// The `oneshot::Receiver` which will resolve when the task -- and any
4854    /// transitive subtasks -- have all exited.
4855    exit_rx: oneshot::Receiver<()>,
4856    _phantom: PhantomData<R>,
4857}
4858
4859impl<R> PreparedCall<R> {
4860    /// Get a copy of the `TaskId` for this `PreparedCall`.
4861    pub(crate) fn task_id(&self) -> TaskId {
4862        TaskId {
4863            task: self.thread.task,
4864        }
4865    }
4866}
4867
4868/// Represents a task created by `prepare_call`.
4869pub(crate) struct TaskId {
4870    task: TableId<GuestTask>,
4871}
4872
4873impl TaskId {
4874    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
4875    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
4876    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
4877    /// and can be resumed by other tasks for this component, so we mark the future as dropped
4878    /// and delete the task when all threads are done.
4879    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4880        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4881        if !task.already_lowered_parameters() {
4882            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4883        } else {
4884            task.host_future_state = HostFutureState::Dropped;
4885            if task.ready_to_delete() {
4886                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4887            }
4888        }
4889        Ok(())
4890    }
4891}
4892
4893/// Prepare a call to the specified exported Wasm function, providing functions
4894/// for lowering the parameters and lifting the result.
4895///
4896/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4897/// loop, use `queue_call`.
4898pub(crate) fn prepare_call<T, R>(
4899    mut store: StoreContextMut<T>,
4900    handle: Func,
4901    param_count: usize,
4902    host_future_present: bool,
4903    call_post_return_automatically: bool,
4904    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4905    + Send
4906    + Sync
4907    + 'static,
4908    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4909    + Send
4910    + Sync
4911    + 'static,
4912) -> Result<PreparedCall<R>> {
4913    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4914
4915    let instance = handle.instance().id().get(store.0);
4916    let options = &instance.component().env_component().options[options];
4917    let task_return_type = instance.component().types()[ty].results;
4918    let component_instance = raw_options.instance;
4919    let callback = options.callback.map(|i| instance.runtime_callback(i));
4920    let memory = options
4921        .memory()
4922        .map(|i| instance.runtime_memory(i))
4923        .map(SendSyncPtr::new);
4924    let string_encoding = options.string_encoding;
4925    let token = StoreToken::new(store.as_context_mut());
4926    let state = store.0.concurrent_state_mut();
4927
4928    assert!(state.guest_thread.is_none());
4929
4930    let (tx, rx) = oneshot::channel();
4931    let (exit_tx, exit_rx) = oneshot::channel();
4932
4933    let mut task = GuestTask::new(
4934        state,
4935        Box::new(for_any_lower(move |store, params| {
4936            lower_params(handle, token.as_context_mut(store), params)
4937        })),
4938        LiftResult {
4939            lift: Box::new(for_any_lift(move |store, result| {
4940                lift_result(handle, store, result)
4941            })),
4942            ty: task_return_type,
4943            memory,
4944            string_encoding,
4945        },
4946        Caller::Host {
4947            tx: Some(tx),
4948            exit_tx: Arc::new(exit_tx),
4949            host_future_present,
4950            call_post_return_automatically,
4951        },
4952        callback.map(|callback| {
4953            let callback = SendSyncPtr::new(callback);
4954            let instance = handle.instance();
4955            Box::new(
4956                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4957                    let store = token.as_context_mut(store);
4958                    // SAFETY: Per the contract of `prepare_call`, the callback
4959                    // will remain valid at least as long is this task exists.
4960                    unsafe {
4961                        instance.call_callback(
4962                            store,
4963                            runtime_instance,
4964                            callback,
4965                            event,
4966                            handle,
4967                            call_post_return_automatically,
4968                        )
4969                    }
4970                },
4971            ) as CallbackFn
4972        }),
4973        component_instance,
4974    )?;
4975    task.function_index = Some(handle.index());
4976
4977    let task = state.push(task)?;
4978    let thread = state.push(GuestThread::new_implicit(task))?;
4979    state.get_mut(task)?.threads.insert(thread);
4980
4981    Ok(PreparedCall {
4982        handle,
4983        thread: QualifiedThreadId { task, thread },
4984        param_count,
4985        rx,
4986        exit_rx,
4987        _phantom: PhantomData,
4988    })
4989}
4990
4991/// Queue a call previously prepared using `prepare_call` to be run as part of
4992/// the associated `ComponentInstance`'s event loop.
4993///
4994/// The returned future will resolve to the result once it is available, but
4995/// must only be polled via the instance's event loop. See
4996/// `StoreContextMut::run_concurrent` for details.
4997pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4998    mut store: StoreContextMut<T>,
4999    prepared: PreparedCall<R>,
5000) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5001    let PreparedCall {
5002        handle,
5003        thread,
5004        param_count,
5005        rx,
5006        exit_rx,
5007        ..
5008    } = prepared;
5009
5010    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5011
5012    Ok(checked(
5013        store.0.id(),
5014        rx.map(move |result| {
5015            result
5016                .map(|v| (*v.downcast().unwrap(), exit_rx))
5017                .map_err(anyhow::Error::from)
5018        }),
5019    ))
5020}
5021
5022/// Queue a call previously prepared using `prepare_call` to be run as part of
5023/// the associated `ComponentInstance`'s event loop.
5024fn queue_call0<T: 'static>(
5025    store: StoreContextMut<T>,
5026    handle: Func,
5027    guest_thread: QualifiedThreadId,
5028    param_count: usize,
5029) -> Result<()> {
5030    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5031    let is_concurrent = raw_options.async_;
5032    let callback = raw_options.callback;
5033    let instance = handle.instance();
5034    let callee = handle.lifted_core_func(store.0);
5035    let post_return = handle.post_return_core_func(store.0);
5036    let callback = callback.map(|i| {
5037        let instance = instance.id().get(store.0);
5038        SendSyncPtr::new(instance.runtime_callback(i))
5039    });
5040
5041    log::trace!("queueing call {guest_thread:?}");
5042
5043    let instance_flags = if callback.is_none() {
5044        None
5045    } else {
5046        Some(flags)
5047    };
5048
5049    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5050    // (with signatures appropriate for this call) and will remain valid as
5051    // long as this instance is valid.
5052    unsafe {
5053        instance.queue_call(
5054            store,
5055            guest_thread,
5056            SendSyncPtr::new(callee),
5057            param_count,
5058            1,
5059            instance_flags,
5060            is_concurrent,
5061            callback,
5062            post_return.map(SendSyncPtr::new),
5063        )
5064    }
5065}