wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60use anyhow::{Context as _, Result, anyhow, bail};
61use error_contexts::GlobalErrorContextRefCount;
62use futures::channel::oneshot;
63use futures::future::{self, Either, FutureExt};
64use futures::stream::{FuturesUnordered, StreamExt};
65use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66use std::any::Any;
67use std::borrow::ToOwned;
68use std::boxed::Box;
69use std::cell::UnsafeCell;
70use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71use std::fmt;
72use std::future::Future;
73use std::marker::PhantomData;
74use std::mem::{self, ManuallyDrop, MaybeUninit};
75use std::ops::DerefMut;
76use std::pin::{Pin, pin};
77use std::ptr::{self, NonNull};
78use std::slice;
79use std::sync::Arc;
80use std::task::{Context, Poll, Waker};
81use std::vec::Vec;
82use table::{TableDebug, TableId};
83use wasmtime_environ::Trap;
84use wasmtime_environ::component::{
85    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90};
91
92pub use abort::JoinHandle;
93pub use futures_and_streams::{
94    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
95    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
96    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
97};
98pub(crate) use futures_and_streams::{
99    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
100};
101
102mod abort;
103mod error_contexts;
104mod futures_and_streams;
105pub(crate) mod table;
106pub(crate) mod tls;
107
108/// Constant defined in the Component Model spec to indicate that the async
109/// intrinsic (e.g. `future.write`) has not yet completed.
110const BLOCKED: u32 = 0xffff_ffff;
111
112/// Corresponds to `CallState` in the upstream spec.
113#[derive(Clone, Copy, Eq, PartialEq, Debug)]
114pub enum Status {
115    Starting = 0,
116    Started = 1,
117    Returned = 2,
118    StartCancelled = 3,
119    ReturnCancelled = 4,
120}
121
122impl Status {
123    /// Packs this status and the optional `waitable` provided into a 32-bit
124    /// result that the canonical ABI requires.
125    ///
126    /// The low 4 bits are reserved for the status while the upper 28 bits are
127    /// the waitable, if present.
128    pub fn pack(self, waitable: Option<u32>) -> u32 {
129        assert!(matches!(self, Status::Returned) == waitable.is_none());
130        let waitable = waitable.unwrap_or(0);
131        assert!(waitable < (1 << 28));
132        (waitable << 4) | (self as u32)
133    }
134}
135
136/// Corresponds to `EventCode` in the Component Model spec, plus related payload
137/// data.
138#[derive(Clone, Copy, Debug)]
139enum Event {
140    None,
141    Cancelled,
142    Subtask {
143        status: Status,
144    },
145    StreamRead {
146        code: ReturnCode,
147        pending: Option<(TypeStreamTableIndex, u32)>,
148    },
149    StreamWrite {
150        code: ReturnCode,
151        pending: Option<(TypeStreamTableIndex, u32)>,
152    },
153    FutureRead {
154        code: ReturnCode,
155        pending: Option<(TypeFutureTableIndex, u32)>,
156    },
157    FutureWrite {
158        code: ReturnCode,
159        pending: Option<(TypeFutureTableIndex, u32)>,
160    },
161}
162
163impl Event {
164    /// Lower this event to core Wasm integers for delivery to the guest.
165    ///
166    /// Note that the waitable handle, if any, is assumed to be lowered
167    /// separately.
168    fn parts(self) -> (u32, u32) {
169        const EVENT_NONE: u32 = 0;
170        const EVENT_SUBTASK: u32 = 1;
171        const EVENT_STREAM_READ: u32 = 2;
172        const EVENT_STREAM_WRITE: u32 = 3;
173        const EVENT_FUTURE_READ: u32 = 4;
174        const EVENT_FUTURE_WRITE: u32 = 5;
175        const EVENT_CANCELLED: u32 = 6;
176        match self {
177            Event::None => (EVENT_NONE, 0),
178            Event::Cancelled => (EVENT_CANCELLED, 0),
179            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184        }
185    }
186}
187
188/// Corresponds to `CallbackCode` in the spec.
189mod callback_code {
190    pub const EXIT: u32 = 0;
191    pub const YIELD: u32 = 1;
192    pub const WAIT: u32 = 2;
193    pub const POLL: u32 = 3;
194}
195
196/// A flag indicating that the callee is an async-lowered export.
197///
198/// This may be passed to the `async-start` intrinsic from a fused adapter.
199const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200
201/// Provides access to either store data (via the `get` method) or the store
202/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
203/// instance to which the current host task belongs.
204///
205/// See [`Accessor::with`] for details.
206pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207    store: StoreContextMut<'a, T>,
208    get_data: fn(&mut T) -> D::Data<'_>,
209}
210
211impl<'a, T, D> Access<'a, T, D>
212where
213    D: HasData + ?Sized,
214    T: 'static,
215{
216    /// Creates a new [`Access`] from its component parts.
217    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218        Self { store, get_data }
219    }
220
221    /// Get mutable access to the store data.
222    pub fn data_mut(&mut self) -> &mut T {
223        self.store.data_mut()
224    }
225
226    /// Get mutable access to the store data.
227    pub fn get(&mut self) -> D::Data<'_> {
228        (self.get_data)(self.data_mut())
229    }
230
231    /// Spawn a background task.
232    ///
233    /// See [`Accessor::spawn`] for details.
234    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
235    where
236        T: 'static,
237    {
238        let accessor = Accessor {
239            get_data: self.get_data,
240            token: StoreToken::new(self.store.as_context_mut()),
241        };
242        self.store
243            .as_context_mut()
244            .spawn_with_accessor(accessor, task)
245    }
246
247    /// Returns the getter this accessor is using to project from `T` into
248    /// `D::Data`.
249    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
250        self.get_data
251    }
252}
253
254impl<'a, T, D> AsContext for Access<'a, T, D>
255where
256    D: HasData + ?Sized,
257    T: 'static,
258{
259    type Data = T;
260
261    fn as_context(&self) -> StoreContext<'_, T> {
262        self.store.as_context()
263    }
264}
265
266impl<'a, T, D> AsContextMut for Access<'a, T, D>
267where
268    D: HasData + ?Sized,
269    T: 'static,
270{
271    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
272        self.store.as_context_mut()
273    }
274}
275
276/// Provides scoped mutable access to store data in the context of a concurrent
277/// host task future.
278///
279/// This allows multiple host task futures to execute concurrently and access
280/// the store between (but not across) `await` points.
281///
282/// # Rationale
283///
284/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
285/// `D::Data<'_>`. The problem this is solving, however, is that it does not
286/// literally store these values. The basic problem is that when a concurrent
287/// host future is being polled it has access to `&mut T` (and the whole
288/// `Store`) but when it's not being polled it does not have access to these
289/// values. This reflects how the store is only ever polling one future at a
290/// time so the store is effectively being passed between futures.
291///
292/// Rust's `Future` trait, however, has no means of passing a `Store`
293/// temporarily between futures. The [`Context`](std::task::Context) type does
294/// not have the ability to attach arbitrary information to it at this time.
295/// This type, [`Accessor`], is used to bridge this expressivity gap.
296///
297/// The [`Accessor`] type here represents the ability to acquire, temporarily in
298/// a synchronous manner, the current store. The [`Accessor::with`] function
299/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
300/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
301/// not take an `async` closure as its argument, instead it's a synchronous
302/// closure which must complete during on run of `Future::poll`. This reflects
303/// how the store is temporarily made available while a host future is being
304/// polled.
305///
306/// # Implementation
307///
308/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
309/// this type additionally doesn't even have a lifetime parameter. This is
310/// instead a representation of proof of the ability to acquire these while a
311/// future is being polled. Wasmtime will, when it polls a host future,
312/// configure ambient state such that the `Accessor` that a future closes over
313/// will work and be able to access the store.
314///
315/// This has a number of implications for users such as:
316///
317/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
318///   the lifetime of a single future.
319/// * A future is expected to, however, close over an `Accessor` and keep it
320///   alive probably for the duration of the entire future.
321/// * Different host futures will be given different `Accessor`s, and that's
322///   intentional.
323/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
324///   alleviates some otherwise required bounds to be written down.
325///
326/// # Using `Accessor` in `Drop`
327///
328/// The methods on `Accessor` are only expected to work in the context of
329/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
330/// host future can be dropped at any time throughout the system and Wasmtime
331/// store context is not necessarily available at that time. It's recommended to
332/// not use `Accessor` methods in anything connected to a `Drop` implementation
333/// as they will panic and have unintended results. If you run into this though
334/// feel free to file an issue on the Wasmtime repository.
335pub struct Accessor<T: 'static, D = HasSelf<T>>
336where
337    D: HasData + ?Sized,
338{
339    token: StoreToken<T>,
340    get_data: fn(&mut T) -> D::Data<'_>,
341}
342
343/// A helper trait to take any type of accessor-with-data in functions.
344///
345/// This trait is similar to [`AsContextMut`] except that it's used when
346/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
347/// [`Accessor`] is the main type used in concurrent settings and is passed to
348/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
349///
350/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
351/// this trait. This effectively means that regardless of the `D` in
352/// `Accessor<T, D>` it can still be passed to a function which just needs a
353/// store accessor.
354///
355/// Acquiring an [`Accessor`] can be done through
356/// [`StoreContextMut::run_concurrent`] for example or in a host function
357/// through
358/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
359pub trait AsAccessor {
360    /// The `T` in `Store<T>` that this accessor refers to.
361    type Data: 'static;
362
363    /// The `D` in `Accessor<T, D>`, or the projection out of
364    /// `Self::Data`.
365    type AccessorData: HasData + ?Sized;
366
367    /// Returns the accessor that this is referring to.
368    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
369}
370
371impl<T: AsAccessor + ?Sized> AsAccessor for &T {
372    type Data = T::Data;
373    type AccessorData = T::AccessorData;
374
375    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
376        T::as_accessor(self)
377    }
378}
379
380impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
381    type Data = T;
382    type AccessorData = D;
383
384    fn as_accessor(&self) -> &Accessor<T, D> {
385        self
386    }
387}
388
389// Note that it is intentional at this time that `Accessor` does not actually
390// store `&mut T` or anything similar. This distinctly enables the `Accessor`
391// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
392// that matter). This is used to ergonomically simplify bindings where the
393// majority of the time `Accessor` is closed over in a future which then needs
394// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
395// you already have to write `T: 'static`...) it helps to avoid this.
396//
397// Note as well that `Accessor` doesn't actually store its data at all. Instead
398// it's more of a "proof" of what can be accessed from TLS. API design around
399// `Accessor` and functions like `Linker::func_wrap_concurrent` are
400// intentionally made to ensure that `Accessor` is ideally only used in the
401// context that TLS variables are actually set. For example host functions are
402// given `&Accessor`, not `Accessor`, and this prevents them from persisting
403// the value outside of a future. Within the future the TLS variables are all
404// guaranteed to be set while the future is being polled.
405//
406// Finally though this is not an ironclad guarantee, but nor does it need to be.
407// The TLS APIs are designed to panic or otherwise model usage where they're
408// called recursively or similar. It's hoped that code cannot be constructed to
409// actually hit this at runtime but this is not a safety requirement at this
410// time.
411const _: () = {
412    const fn assert<T: Send + Sync>() {}
413    assert::<Accessor<UnsafeCell<u32>>>();
414};
415
416impl<T> Accessor<T> {
417    /// Creates a new `Accessor` backed by the specified functions.
418    ///
419    /// - `get`: used to retrieve the store
420    ///
421    /// - `get_data`: used to "project" from the store's associated data to
422    /// another type (e.g. a field of that data or a wrapper around it).
423    ///
424    /// - `spawn`: used to queue spawned background tasks to be run later
425    pub(crate) fn new(token: StoreToken<T>) -> Self {
426        Self {
427            token,
428            get_data: |x| x,
429        }
430    }
431}
432
433impl<T, D> Accessor<T, D>
434where
435    D: HasData + ?Sized,
436{
437    /// Run the specified closure, passing it mutable access to the store.
438    ///
439    /// This function is one of the main building blocks of the [`Accessor`]
440    /// type. This yields synchronous, blocking, access to store via an
441    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
442    /// providing the ability to access `D` via [`Access::get`]. Note that the
443    /// `fun` here is given only temporary access to the store and `T`/`D`
444    /// meaning that the return value `R` here is not allowed to capture borrows
445    /// into the two. If access is needed to data within `T` or `D` outside of
446    /// this closure then it must be `clone`d out, for example.
447    ///
448    /// # Panics
449    ///
450    /// This function will panic if it is call recursively with any other
451    /// accessor already in scope. For example if `with` is called within `fun`,
452    /// then this function will panic. It is up to the embedder to ensure that
453    /// this does not happen.
454    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
455        tls::get(|vmstore| {
456            fun(Access {
457                store: self.token.as_context_mut(vmstore),
458                get_data: self.get_data,
459            })
460        })
461    }
462
463    /// Returns the getter this accessor is using to project from `T` into
464    /// `D::Data`.
465    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
466        self.get_data
467    }
468
469    /// Changes this accessor to access `D2` instead of the current type
470    /// parameter `D`.
471    ///
472    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
473    ///
474    /// # Panics
475    ///
476    /// When using this API the returned value is disconnected from `&self` and
477    /// the lifetime binding the `self` argument. An `Accessor` only works
478    /// within the context of the closure or async closure that it was
479    /// originally given to, however. This means that due to the fact that the
480    /// returned value has no lifetime connection it's possible to use the
481    /// accessor outside of `&self`, the original accessor, and panic.
482    ///
483    /// The returned value should only be used within the scope of the original
484    /// `Accessor` that `self` refers to.
485    pub fn with_getter<D2: HasData>(
486        &self,
487        get_data: fn(&mut T) -> D2::Data<'_>,
488    ) -> Accessor<T, D2> {
489        Accessor {
490            token: self.token,
491            get_data,
492        }
493    }
494
495    /// Spawn a background task which will receive an `&Accessor<T, D>` and
496    /// run concurrently with any other tasks in progress for the current
497    /// store.
498    ///
499    /// This is particularly useful for host functions which return a `stream`
500    /// or `future` such that the code to write to the write end of that
501    /// `stream` or `future` must run after the function returns.
502    ///
503    /// The returned [`JoinHandle`] may be used to cancel the task.
504    ///
505    /// # Panics
506    ///
507    /// Panics if called within a closure provided to the [`Accessor::with`]
508    /// function. This can only be called outside an active invocation of
509    /// [`Accessor::with`].
510    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
511    where
512        T: 'static,
513    {
514        let accessor = self.clone_for_spawn();
515        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
516    }
517
518    fn clone_for_spawn(&self) -> Self {
519        Self {
520            token: self.token,
521            get_data: self.get_data,
522        }
523    }
524}
525
526/// Represents a task which may be provided to `Accessor::spawn`,
527/// `Accessor::forward`, or `StorecContextMut::spawn`.
528// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
529// option.
530//
531// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
532// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
533// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
534// Send + Sync + 'static` fails with a type mismatch error when we try to pass
535// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
536// best we can do for the time being.
537pub trait AccessorTask<T, D, R>: Send + 'static
538where
539    D: HasData + ?Sized,
540{
541    /// Run the task.
542    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
543}
544
545/// Represents parameter and result metadata for the caller side of a
546/// guest->guest call orchestrated by a fused adapter.
547enum CallerInfo {
548    /// Metadata for a call to an async-lowered import
549    Async {
550        params: Vec<ValRaw>,
551        has_result: bool,
552    },
553    /// Metadata for a call to an sync-lowered import
554    Sync {
555        params: Vec<ValRaw>,
556        result_count: u32,
557    },
558}
559
560/// Indicates how a guest task is waiting on a waitable set.
561enum WaitMode {
562    /// The guest task is waiting using `task.wait`
563    Fiber(StoreFiber<'static>),
564    /// The guest task is waiting via a callback declared as part of an
565    /// async-lifted export.
566    Callback(Instance),
567}
568
569/// Represents the reason a fiber is suspending itself.
570#[derive(Debug)]
571enum SuspendReason {
572    /// The fiber is waiting for an event to be delivered to the specified
573    /// waitable set or task.
574    Waiting {
575        set: TableId<WaitableSet>,
576        thread: QualifiedThreadId,
577    },
578    /// The fiber has finished handling its most recent work item and is waiting
579    /// for another (or to be dropped if it is no longer needed).
580    NeedWork,
581    /// The fiber is yielding and should be resumed once other tasks have had a
582    /// chance to run.
583    Yielding { thread: QualifiedThreadId },
584    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
585    ExplicitlySuspending { thread: QualifiedThreadId },
586}
587
588/// Represents a pending call into guest code for a given guest task.
589enum GuestCallKind {
590    /// Indicates there's an event to deliver to the task, possibly related to a
591    /// waitable set the task has been waiting on or polling.
592    DeliverEvent {
593        /// The instance to which the task belongs.
594        instance: Instance,
595        /// The waitable set the event belongs to, if any.
596        ///
597        /// If this is `None` the event will be waiting in the
598        /// `GuestTask::event` field for the task.
599        set: Option<TableId<WaitableSet>>,
600    },
601    /// Indicates that a new guest task call is pending and may be executed
602    /// using the specified closure.
603    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
604    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
605}
606
607impl fmt::Debug for GuestCallKind {
608    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
609        match self {
610            Self::DeliverEvent { instance, set } => f
611                .debug_struct("DeliverEvent")
612                .field("instance", instance)
613                .field("set", set)
614                .finish(),
615            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
616            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
617        }
618    }
619}
620
621/// Represents a pending call into guest code for a given guest thread.
622#[derive(Debug)]
623struct GuestCall {
624    thread: QualifiedThreadId,
625    kind: GuestCallKind,
626}
627
628impl GuestCall {
629    /// Returns whether or not the call is ready to run.
630    ///
631    /// A call will not be ready to run if either:
632    ///
633    /// - the (sub-)component instance to be called has already been entered and
634    /// cannot be reentered until an in-progress call completes
635    ///
636    /// - the call is for a not-yet started task and the (sub-)component
637    /// instance to be called has backpressure enabled
638    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
639        let task_instance = state.get_mut(self.thread.task)?.instance;
640        let state = state.instance_state(task_instance);
641
642        let ready = match &self.kind {
643            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
644            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
645            GuestCallKind::StartExplicit(_) => true,
646        };
647        log::trace!(
648            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
649            state.do_not_enter,
650            state.backpressure
651        );
652        Ok(ready)
653    }
654}
655
656/// Job to be run on a worker fiber.
657enum WorkerItem {
658    GuestCall(GuestCall),
659    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
660}
661
662/// Represents state related to an in-progress poll operation (e.g. `task.poll`
663/// or `CallbackCode.POLL`).
664#[derive(Debug)]
665struct PollParams {
666    /// The instance to which the polling thread belongs.
667    instance: Instance,
668    /// The polling thread.
669    thread: QualifiedThreadId,
670    /// The waitable set being polled.
671    set: TableId<WaitableSet>,
672}
673
674/// Represents a pending work item to be handled by the event loop for a given
675/// component instance.
676enum WorkItem {
677    /// A host task to be pushed to `ConcurrentState::futures`.
678    PushFuture(AlwaysMut<HostTaskFuture>),
679    /// A fiber to resume.
680    ResumeFiber(StoreFiber<'static>),
681    /// A pending call into guest code for a given guest task.
682    GuestCall(GuestCall),
683    /// A pending `task.poll` or `CallbackCode.POLL` operation.
684    Poll(PollParams),
685    /// A job to run on a worker fiber.
686    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687}
688
689impl fmt::Debug for WorkItem {
690    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691        match self {
692            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
696            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
697        }
698    }
699}
700
701/// Whether a suspension intrinsic was cancelled or completed
702#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
703pub(crate) enum WaitResult {
704    Cancelled,
705    Completed,
706}
707
708/// Poll the specified future until it completes on behalf of a guest->host call
709/// using a sync-lowered import.
710///
711/// This is similar to `Instance::first_poll` except it's for sync-lowered
712/// imports, meaning we don't need to handle cancellation and we can block the
713/// caller until the task completes, at which point the caller can handle
714/// lowering the result to the guest's stack and linear memory.
715pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
716    store: &mut dyn VMStore,
717    future: impl Future<Output = Result<R>> + Send + 'static,
718    caller_instance: RuntimeComponentInstanceIndex,
719) -> Result<R> {
720    let state = store.concurrent_state_mut();
721
722    // If there is no current guest thread set, that means the host function was
723    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
724    // should complete immediately.
725    let Some(caller) = state.guest_thread else {
726        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
727            Poll::Ready(result) => result,
728            Poll::Pending => {
729                unreachable!()
730            }
731        };
732    };
733
734    // Save any existing result stashed in `GuestTask::result` so we can replace
735    // it with the new result.
736    let old_result = state
737        .get_mut(caller.task)
738        .with_context(|| format!("bad handle: {caller:?}"))?
739        .result
740        .take();
741
742    // Add a temporary host task into the table so we can track its progress.
743    // Note that we'll never allocate a waitable handle for the guest since
744    // we're being called synchronously.
745    let task = state.push(HostTask::new(caller_instance, None))?;
746
747    log::trace!("new host task child of {caller:?}: {task:?}");
748
749    // Wrap the future in a closure which will take care of stashing the result
750    // in `GuestTask::result` and resuming this fiber when the host task
751    // completes.
752    let mut future = Box::pin(async move {
753        let result = future.await?;
754        tls::get(move |store| {
755            let state = store.concurrent_state_mut();
756            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
757
758            Waitable::Host(task).set_event(
759                state,
760                Some(Event::Subtask {
761                    status: Status::Returned,
762                }),
763            )?;
764
765            Ok(())
766        })
767    }) as HostTaskFuture;
768
769    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
770    // add the future to `ConcurrentState::futures` and poll it automatically
771    // from the event loop if it doesn't complete immediately here.
772    let poll = tls::set(store, || {
773        future
774            .as_mut()
775            .poll(&mut Context::from_waker(&Waker::noop()))
776    });
777
778    match poll {
779        Poll::Ready(result) => {
780            // It completed immediately; check the result and delete the task.
781            result?;
782            log::trace!("delete host task {task:?} (already ready)");
783            store.concurrent_state_mut().delete(task)?;
784        }
785        Poll::Pending => {
786            // It did not complete immediately; add it to
787            // `ConcurrentState::futures` so it will be polled via the event
788            // loop; then use `GuestTask::sync_call_set` to wait for the task to
789            // complete, suspending the current fiber until it does so.
790            let state = store.concurrent_state_mut();
791            state.push_future(future);
792
793            let set = state.get_mut(caller.task)?.sync_call_set;
794            Waitable::Host(task).join(state, Some(set))?;
795
796            store.suspend(SuspendReason::Waiting {
797                set,
798                thread: caller,
799            })?;
800        }
801    }
802
803    // Retrieve and return the result.
804    Ok(*mem::replace(
805        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
806        old_result,
807    )
808    .unwrap()
809    .downcast()
810    .unwrap())
811}
812
813/// Execute the specified guest call.
814fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
815    match call.kind {
816        GuestCallKind::DeliverEvent { instance, set } => {
817            let (event, waitable) = instance
818                .get_event(store, call.thread.task, set, true)?
819                .unwrap();
820            let state = store.concurrent_state_mut();
821            let task = state.get_mut(call.thread.task)?;
822            let runtime_instance = task.instance;
823            let handle = waitable.map(|(_, v)| v).unwrap_or(0);
824
825            log::trace!(
826                "use callback to deliver event {event:?} to {:?} for {waitable:?}",
827                call.thread,
828            );
829
830            let old_thread = state.guest_thread.replace(call.thread);
831            log::trace!(
832                "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
833                call.thread
834            );
835
836            store.maybe_push_call_context(call.thread.task)?;
837
838            let state = store.concurrent_state_mut();
839            state.enter_instance(runtime_instance);
840
841            let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
842
843            let code = callback(store, runtime_instance, event, handle)?;
844
845            let state = store.concurrent_state_mut();
846
847            state.get_mut(call.thread.task)?.callback = Some(callback);
848            state.exit_instance(runtime_instance)?;
849
850            store.maybe_pop_call_context(call.thread.task)?;
851
852            instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
853
854            store.concurrent_state_mut().guest_thread = old_thread;
855            log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
856        }
857        GuestCallKind::StartImplicit(fun) => {
858            fun(store)?;
859        }
860        GuestCallKind::StartExplicit(fun) => {
861            fun(store)?;
862        }
863    }
864
865    Ok(())
866}
867
868impl<T> Store<T> {
869    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
870    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
871    where
872        T: Send + 'static,
873    {
874        self.as_context_mut().run_concurrent(fun).await
875    }
876
877    #[doc(hidden)]
878    pub fn assert_concurrent_state_empty(&mut self) {
879        self.as_context_mut().assert_concurrent_state_empty();
880    }
881
882    /// Convenience wrapper for [`StoreContextMut::spawn`].
883    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
884    where
885        T: 'static,
886    {
887        self.as_context_mut().spawn(task)
888    }
889}
890
891impl<T> StoreContextMut<'_, T> {
892    /// Assert that all the relevant tables and queues in the concurrent state
893    /// for this store are empty.
894    ///
895    /// This is for sanity checking in integration tests
896    /// (e.g. `component-async-tests`) that the relevant state has been cleared
897    /// after each test concludes.  This should help us catch leaks, e.g. guest
898    /// tasks which haven't been deleted despite having completed and having
899    /// been dropped by their supertasks.
900    #[doc(hidden)]
901    pub fn assert_concurrent_state_empty(self) {
902        let store = self.0;
903        store
904            .store_data_mut()
905            .components
906            .assert_guest_tables_empty();
907        let state = store.concurrent_state_mut();
908        assert!(
909            state.table.get_mut().is_empty(),
910            "non-empty table: {:?}",
911            state.table.get_mut()
912        );
913        assert!(state.high_priority.is_empty());
914        assert!(state.low_priority.is_empty());
915        assert!(state.guest_thread.is_none());
916        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
917        assert!(
918            state
919                .instance_states
920                .iter()
921                .all(|(_, state)| state.pending.is_empty())
922        );
923        assert!(state.global_error_context_ref_counts.is_empty());
924    }
925
926    /// Spawn a background task to run as part of this instance's event loop.
927    ///
928    /// The task will receive an `&Accessor<U>` and run concurrently with
929    /// any other tasks in progress for the instance.
930    ///
931    /// Note that the task will only make progress if and when the event loop
932    /// for this instance is run.
933    ///
934    /// The returned [`SpawnHandle`] may be used to cancel the task.
935    pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
936    where
937        T: 'static,
938    {
939        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
940        self.spawn_with_accessor(accessor, task)
941    }
942
943    /// Internal implementation of `spawn` functions where a `store` is
944    /// available along with an `Accessor`.
945    fn spawn_with_accessor<D>(
946        self,
947        accessor: Accessor<T, D>,
948        task: impl AccessorTask<T, D, Result<()>>,
949    ) -> JoinHandle
950    where
951        T: 'static,
952        D: HasData + ?Sized,
953    {
954        // Create an "abortable future" here where internally the future will
955        // hook calls to poll and possibly spawn more background tasks on each
956        // iteration.
957        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
958        self.0
959            .concurrent_state_mut()
960            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
961        handle
962    }
963
964    /// Run the specified closure `fun` to completion as part of this store's
965    /// event loop.
966    ///
967    /// This will run `fun` as part of this store's event loop until it
968    /// yields a result.  `fun` is provided an [`Accessor`], which provides
969    /// controlled access to the store and its data.
970    ///
971    /// This function can be used to invoke [`Func::call_concurrent`] for
972    /// example within the async closure provided here.
973    ///
974    /// # Store-blocking behavior
975    ///
976    ///
977    ///
978    /// At this time there are certain situations in which the `Future` returned
979    /// by the `AsyncFnOnce` passed to this function will not be polled for an
980    /// extended period of time, despite one or more `Waker::wake` events having
981    /// occurred for the task to which it belongs.  This can manifest as the
982    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
983    /// the `Store` being held by e.g. a blocking host function, preventing the
984    /// `Future` from being polled. A canonical example of this is when the
985    /// `fun` provided to this function attempts to set a timeout for an
986    /// invocation of a wasm function. In this situation the async closure is
987    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
988    /// to elapse. At this time this setup will not always work and the timeout
989    /// may not reliably fire.
990    ///
991    /// This function will not block the current thread and as such is always
992    /// suitable to run in an `async` context, but the current implementation of
993    /// Wasmtime can lead to situations where a certain wasm computation is
994    /// required to make progress the closure to make progress. This is an
995    /// artifact of Wasmtime's historical implementation of `async` functions
996    /// and is the topic of [#11869] and [#11870]. In the timeout example from
997    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
998    /// progress for a readiness notification of (b) to get delivered.
999    ///
1000    /// This effectively means that it's not possible to reliably perform a
1001    /// "select" operation within the `fun` closure, which timeouts for example
1002    /// are based on. Fixing this requires some relatively major refactoring
1003    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1004    /// that is intended to be fixed one day. In the meantime it's recommended
1005    /// to apply timeouts or such to the entire `run_concurrent` call itself
1006    /// rather than internally.
1007    ///
1008    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1009    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1010    ///
1011    /// # Example
1012    ///
1013    /// ```
1014    /// # use {
1015    /// #   anyhow::{Result},
1016    /// #   wasmtime::{
1017    /// #     component::{ Component, Linker, Resource, ResourceTable},
1018    /// #     Config, Engine, Store
1019    /// #   },
1020    /// # };
1021    /// #
1022    /// # struct MyResource(u32);
1023    /// # struct Ctx { table: ResourceTable }
1024    /// #
1025    /// # async fn foo() -> Result<()> {
1026    /// # let mut config = Config::new();
1027    /// # let engine = Engine::new(&config)?;
1028    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1029    /// # let mut linker = Linker::new(&engine);
1030    /// # let component = Component::new(&engine, "")?;
1031    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1032    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1033    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1034    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1035    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1036    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1037    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1038    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1039    ///    Ok(())
1040    /// }).await??;
1041    /// # Ok(())
1042    /// # }
1043    /// ```
1044    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1045    where
1046        T: Send + 'static,
1047    {
1048        self.do_run_concurrent(fun, false).await
1049    }
1050
1051    pub(super) async fn run_concurrent_trap_on_idle<R>(
1052        self,
1053        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1054    ) -> Result<R>
1055    where
1056        T: Send + 'static,
1057    {
1058        self.do_run_concurrent(fun, true).await
1059    }
1060
1061    async fn do_run_concurrent<R>(
1062        mut self,
1063        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1064        trap_on_idle: bool,
1065    ) -> Result<R>
1066    where
1067        T: Send + 'static,
1068    {
1069        check_recursive_run();
1070        let token = StoreToken::new(self.as_context_mut());
1071
1072        struct Dropper<'a, T: 'static, V> {
1073            store: StoreContextMut<'a, T>,
1074            value: ManuallyDrop<V>,
1075        }
1076
1077        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1078            fn drop(&mut self) {
1079                tls::set(self.store.0, || {
1080                    // SAFETY: Here we drop the value without moving it for the
1081                    // first and only time -- per the contract for `Drop::drop`,
1082                    // this code won't run again, and the `value` field will no
1083                    // longer be accessible.
1084                    unsafe { ManuallyDrop::drop(&mut self.value) }
1085                });
1086            }
1087        }
1088
1089        let accessor = &Accessor::new(token);
1090        let dropper = &mut Dropper {
1091            store: self,
1092            value: ManuallyDrop::new(fun(accessor)),
1093        };
1094        // SAFETY: We never move `dropper` nor its `value` field.
1095        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1096
1097        dropper
1098            .store
1099            .as_context_mut()
1100            .poll_until(future, trap_on_idle)
1101            .await
1102    }
1103
1104    /// Run this store's event loop.
1105    ///
1106    /// The returned future will resolve when the specified future completes or,
1107    /// if `trap_on_idle` is true, when the event loop can't make further
1108    /// progress.
1109    async fn poll_until<R>(
1110        mut self,
1111        mut future: Pin<&mut impl Future<Output = R>>,
1112        trap_on_idle: bool,
1113    ) -> Result<R>
1114    where
1115        T: Send + 'static,
1116    {
1117        struct Reset<'a, T: 'static> {
1118            store: StoreContextMut<'a, T>,
1119            futures: Option<FuturesUnordered<HostTaskFuture>>,
1120        }
1121
1122        impl<'a, T> Drop for Reset<'a, T> {
1123            fn drop(&mut self) {
1124                if let Some(futures) = self.futures.take() {
1125                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1126                }
1127            }
1128        }
1129
1130        loop {
1131            // Take `ConcurrentState::futures` out of the store so we can poll
1132            // it while also safely giving any of the futures inside access to
1133            // `self`.
1134            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1135            let mut reset = Reset {
1136                store: self.as_context_mut(),
1137                futures,
1138            };
1139            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1140
1141            let result = future::poll_fn(|cx| {
1142                // First, poll the future we were passed as an argument and
1143                // return immediately if it's ready.
1144                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1145                    return Poll::Ready(Ok(Either::Left(value)));
1146                }
1147
1148                // Next, poll `ConcurrentState::futures` (which includes any
1149                // pending host tasks and/or background tasks), returning
1150                // immediately if one of them fails.
1151                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1152                    Poll::Ready(Some(output)) => {
1153                        match output {
1154                            Err(e) => return Poll::Ready(Err(e)),
1155                            Ok(()) => {}
1156                        }
1157                        Poll::Ready(true)
1158                    }
1159                    Poll::Ready(None) => Poll::Ready(false),
1160                    Poll::Pending => Poll::Pending,
1161                };
1162
1163                // Next, check the "high priority" work queue and return
1164                // immediately if it has at least one item.
1165                let state = reset.store.0.concurrent_state_mut();
1166                let ready = mem::take(&mut state.high_priority);
1167                let ready = if ready.is_empty() {
1168                    // Next, check the "low priority" work queue and return
1169                    // immediately if it has at least one item.
1170                    let ready = mem::take(&mut state.low_priority);
1171                    if ready.is_empty() {
1172                        return match next {
1173                            Poll::Ready(true) => {
1174                                // In this case, one of the futures in
1175                                // `ConcurrentState::futures` completed
1176                                // successfully, so we return now and continue
1177                                // the outer loop in case there is another one
1178                                // ready to complete.
1179                                Poll::Ready(Ok(Either::Right(Vec::new())))
1180                            }
1181                            Poll::Ready(false) => {
1182                                // Poll the future we were passed one last time
1183                                // in case one of `ConcurrentState::futures` had
1184                                // the side effect of unblocking it.
1185                                if let Poll::Ready(value) =
1186                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1187                                {
1188                                    Poll::Ready(Ok(Either::Left(value)))
1189                                } else {
1190                                    // In this case, there are no more pending
1191                                    // futures in `ConcurrentState::futures`,
1192                                    // there are no remaining work items, _and_
1193                                    // the future we were passed as an argument
1194                                    // still hasn't completed.
1195                                    if trap_on_idle {
1196                                        // `trap_on_idle` is true, so we exit
1197                                        // immediately.
1198                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1199                                    } else {
1200                                        // `trap_on_idle` is false, so we assume
1201                                        // that future will wake up and give us
1202                                        // more work to do when it's ready to.
1203                                        Poll::Pending
1204                                    }
1205                                }
1206                            }
1207                            // There is at least one pending future in
1208                            // `ConcurrentState::futures` and we have nothing
1209                            // else to do but wait for now, so we return
1210                            // `Pending`.
1211                            Poll::Pending => Poll::Pending,
1212                        };
1213                    } else {
1214                        ready
1215                    }
1216                } else {
1217                    ready
1218                };
1219
1220                Poll::Ready(Ok(Either::Right(ready)))
1221            })
1222            .await;
1223
1224            // Put the `ConcurrentState::futures` back into the store before we
1225            // return or handle any work items since one or more of those items
1226            // might append more futures.
1227            drop(reset);
1228
1229            match result? {
1230                // The future we were passed as an argument completed, so we
1231                // return the result.
1232                Either::Left(value) => break Ok(value),
1233                // The future we were passed has not yet completed, so handle
1234                // any work items and then loop again.
1235                Either::Right(ready) => {
1236                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1237                        store: StoreContextMut<'a, T>,
1238                        ready: I,
1239                    }
1240
1241                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1242                        fn drop(&mut self) {
1243                            while let Some(item) = self.ready.next() {
1244                                match item {
1245                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1246                                    WorkItem::PushFuture(future) => {
1247                                        tls::set(self.store.0, move || drop(future))
1248                                    }
1249                                    _ => {}
1250                                }
1251                            }
1252                        }
1253                    }
1254
1255                    let mut dispose = Dispose {
1256                        store: self.as_context_mut(),
1257                        ready: ready.into_iter(),
1258                    };
1259
1260                    while let Some(item) = dispose.ready.next() {
1261                        dispose
1262                            .store
1263                            .as_context_mut()
1264                            .handle_work_item(item)
1265                            .await?;
1266                    }
1267                }
1268            }
1269        }
1270    }
1271
1272    /// Handle the specified work item, possibly resuming a fiber if applicable.
1273    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1274    where
1275        T: Send,
1276    {
1277        log::trace!("handle work item {item:?}");
1278        match item {
1279            WorkItem::PushFuture(future) => {
1280                self.0
1281                    .concurrent_state_mut()
1282                    .futures
1283                    .get_mut()
1284                    .as_mut()
1285                    .unwrap()
1286                    .push(future.into_inner());
1287            }
1288            WorkItem::ResumeFiber(fiber) => {
1289                self.0.resume_fiber(fiber).await?;
1290            }
1291            WorkItem::GuestCall(call) => {
1292                let state = self.0.concurrent_state_mut();
1293                if call.is_ready(state)? {
1294                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1295                } else {
1296                    let task = state.get_mut(call.thread.task)?;
1297                    if !task.starting_sent {
1298                        task.starting_sent = true;
1299                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1300                            Waitable::Guest(call.thread.task).set_event(
1301                                state,
1302                                Some(Event::Subtask {
1303                                    status: Status::Starting,
1304                                }),
1305                            )?;
1306                        }
1307                    }
1308
1309                    let runtime_instance = state.get_mut(call.thread.task)?.instance;
1310                    state
1311                        .instance_state(runtime_instance)
1312                        .pending
1313                        .insert(call.thread, call.kind);
1314                }
1315            }
1316            WorkItem::Poll(params) => {
1317                let state = self.0.concurrent_state_mut();
1318                if state.get_mut(params.thread.task)?.event.is_some()
1319                    || !state.get_mut(params.set)?.ready.is_empty()
1320                {
1321                    // There's at least one event immediately available; deliver
1322                    // it to the guest ASAP.
1323                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1324                        thread: params.thread,
1325                        kind: GuestCallKind::DeliverEvent {
1326                            instance: params.instance,
1327                            set: Some(params.set),
1328                        },
1329                    }));
1330                } else {
1331                    // There are no events immediately available; deliver
1332                    // `Event::None` to the guest.
1333                    state.get_mut(params.thread.task)?.event = Some(Event::None);
1334                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1335                        thread: params.thread,
1336                        kind: GuestCallKind::DeliverEvent {
1337                            instance: params.instance,
1338                            set: Some(params.set),
1339                        },
1340                    }));
1341                }
1342            }
1343            WorkItem::WorkerFunction(fun) => {
1344                self.run_on_worker(WorkerItem::Function(fun)).await?;
1345            }
1346        }
1347
1348        Ok(())
1349    }
1350
1351    /// Execute the specified guest call on a worker fiber.
1352    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1353    where
1354        T: Send,
1355    {
1356        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1357            fiber
1358        } else {
1359            fiber::make_fiber(self.0, move |store| {
1360                loop {
1361                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1362                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1363                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1364                    }
1365
1366                    store.suspend(SuspendReason::NeedWork)?;
1367                }
1368            })?
1369        };
1370
1371        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1372        assert!(worker_item.is_none());
1373        *worker_item = Some(item);
1374
1375        self.0.resume_fiber(worker).await
1376    }
1377
1378    /// Wrap the specified host function in a future which will call it, passing
1379    /// it an `&Accessor<T>`.
1380    ///
1381    /// See the `Accessor` documentation for details.
1382    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1383    where
1384        T: 'static,
1385        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1386            + Send
1387            + Sync
1388            + 'static,
1389        R: Send + Sync + 'static,
1390    {
1391        let token = StoreToken::new(self);
1392        async move {
1393            let mut accessor = Accessor::new(token);
1394            closure(&mut accessor).await
1395        }
1396    }
1397}
1398
1399impl StoreOpaque {
1400    /// Resume the specified fiber, giving it exclusive access to the specified
1401    /// store.
1402    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1403        let old_thread = self.concurrent_state_mut().guest_thread;
1404        log::trace!("resume_fiber: save current thread {old_thread:?}");
1405
1406        let fiber = fiber::resolve_or_release(self, fiber).await?;
1407
1408        let state = self.concurrent_state_mut();
1409
1410        state.guest_thread = old_thread;
1411        if let Some(ref ot) = old_thread {
1412            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1413        }
1414        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1415
1416        if let Some(mut fiber) = fiber {
1417            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1418            // See the `SuspendReason` documentation for what each case means.
1419            match state.suspend_reason.take().unwrap() {
1420                SuspendReason::NeedWork => {
1421                    if state.worker.is_none() {
1422                        state.worker = Some(fiber);
1423                    } else {
1424                        fiber.dispose(self);
1425                    }
1426                }
1427                SuspendReason::Yielding { thread, .. } => {
1428                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1429                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1430                }
1431                SuspendReason::ExplicitlySuspending { thread, .. } => {
1432                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1433                }
1434                SuspendReason::Waiting { set, thread } => {
1435                    let old = state
1436                        .get_mut(set)?
1437                        .waiting
1438                        .insert(thread, WaitMode::Fiber(fiber));
1439                    assert!(old.is_none());
1440                }
1441            };
1442        } else {
1443            log::trace!("resume_fiber: fiber has exited");
1444        }
1445
1446        Ok(())
1447    }
1448
1449    /// Suspend the current fiber, storing the reason in
1450    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1451    /// it should be resumed.
1452    ///
1453    /// See the `SuspendReason` documentation for details.
1454    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1455        log::trace!("suspend fiber: {reason:?}");
1456
1457        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1458        // pop the call context which manages resource borrows before suspending
1459        // and then push it again once we've resumed.
1460        let task = match &reason {
1461            SuspendReason::Yielding { thread, .. }
1462            | SuspendReason::Waiting { thread, .. }
1463            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1464            SuspendReason::NeedWork => None,
1465        };
1466
1467        let old_guest_thread = if let Some(task) = task {
1468            self.maybe_pop_call_context(task)?;
1469            self.concurrent_state_mut().guest_thread
1470        } else {
1471            None
1472        };
1473
1474        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1475        assert!(suspend_reason.is_none());
1476        *suspend_reason = Some(reason);
1477
1478        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1479
1480        if let Some(task) = task {
1481            self.concurrent_state_mut().guest_thread = old_guest_thread;
1482            self.maybe_push_call_context(task)?;
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Push the call context for managing resource borrows for the specified
1489    /// guest task if it has not yet either returned a result or cancelled
1490    /// itself.
1491    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1492        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1493
1494        if !task.returned_or_cancelled() {
1495            log::trace!("push call context for {guest_task:?}");
1496            let call_context = task.call_context.take().unwrap();
1497            self.component_resource_state().0.push(call_context);
1498        }
1499        Ok(())
1500    }
1501
1502    /// Pop the call context for managing resource borrows for the specified
1503    /// guest task if it has not yet either returned a result or cancelled
1504    /// itself.
1505    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1506        if !self
1507            .concurrent_state_mut()
1508            .get_mut(guest_task)?
1509            .returned_or_cancelled()
1510        {
1511            log::trace!("pop call context for {guest_task:?}");
1512            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1513            self.concurrent_state_mut()
1514                .get_mut(guest_task)?
1515                .call_context = call_context;
1516        }
1517        Ok(())
1518    }
1519
1520    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1521        let state = self.concurrent_state_mut();
1522        let caller = state.guest_thread.unwrap();
1523        let old_set = waitable.common(state)?.set;
1524        let set = state.get_mut(caller.task)?.sync_call_set;
1525        waitable.join(state, Some(set))?;
1526        self.suspend(SuspendReason::Waiting {
1527            set,
1528            thread: caller,
1529        })?;
1530        let state = self.concurrent_state_mut();
1531        waitable.join(state, old_set)
1532    }
1533}
1534
1535impl Instance {
1536    /// Get the next pending event for the specified task and (optional)
1537    /// waitable set, along with the waitable handle if applicable.
1538    fn get_event(
1539        self,
1540        store: &mut StoreOpaque,
1541        guest_task: TableId<GuestTask>,
1542        set: Option<TableId<WaitableSet>>,
1543        cancellable: bool,
1544    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1545        let state = store.concurrent_state_mut();
1546
1547        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1548            log::trace!("deliver event {event:?} to {guest_task:?}");
1549
1550            if cancellable || !matches!(event, Event::Cancelled) {
1551                return Ok(Some((event, None)));
1552            } else {
1553                state.get_mut(guest_task)?.event = Some(event);
1554            }
1555        }
1556
1557        Ok(
1558            if let Some((set, waitable)) = set
1559                .and_then(|set| {
1560                    state
1561                        .get_mut(set)
1562                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1563                        .transpose()
1564                })
1565                .transpose()?
1566            {
1567                let common = waitable.common(state)?;
1568                let handle = common.handle.unwrap();
1569                let event = common.event.take().unwrap();
1570
1571                log::trace!(
1572                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1573                );
1574
1575                waitable.on_delivery(store, self, event);
1576
1577                Some((event, Some((waitable, handle))))
1578            } else {
1579                None
1580            },
1581        )
1582    }
1583
1584    /// Handle the `CallbackCode` returned from an async-lifted export or its
1585    /// callback.
1586    fn handle_callback_code(
1587        self,
1588        store: &mut StoreOpaque,
1589        guest_thread: QualifiedThreadId,
1590        runtime_instance: RuntimeComponentInstanceIndex,
1591        code: u32,
1592    ) -> Result<()> {
1593        let (code, set) = unpack_callback_code(code);
1594
1595        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597        let state = store.concurrent_state_mut();
1598
1599        let get_set = |store, handle| {
1600            if handle == 0 {
1601                bail!("invalid waitable-set handle");
1602            }
1603
1604            let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1605                .waitable_set_rep(handle)?;
1606
1607            Ok(TableId::<WaitableSet>::new(set))
1608        };
1609
1610        match code {
1611            callback_code::EXIT => {
1612                log::trace!("implicit thread {guest_thread:?} completed");
1613                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1614                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1615                if task.threads.is_empty() && !task.returned_or_cancelled() {
1616                    bail!(Trap::NoAsyncResult);
1617                }
1618                match &task.caller {
1619                    Caller::Host { .. } => {
1620                        if task.ready_to_delete() {
1621                            Waitable::Guest(guest_thread.task)
1622                                .delete_from(store.concurrent_state_mut())?;
1623                        }
1624                    }
1625                    Caller::Guest { .. } => {
1626                        task.exited = true;
1627                        task.callback = None;
1628                    }
1629                }
1630            }
1631            callback_code::YIELD => {
1632                // Push this thread onto the "low priority" queue so it runs after
1633                // any other threads have had a chance to run.
1634                let task = state.get_mut(guest_thread.task)?;
1635                assert!(task.event.is_none());
1636                task.event = Some(Event::None);
1637                state.push_low_priority(WorkItem::GuestCall(GuestCall {
1638                    thread: guest_thread,
1639                    kind: GuestCallKind::DeliverEvent {
1640                        instance: self,
1641                        set: None,
1642                    },
1643                }));
1644            }
1645            callback_code::WAIT | callback_code::POLL => {
1646                let set = get_set(store, set)?;
1647                let state = store.concurrent_state_mut();
1648
1649                if state.get_mut(guest_thread.task)?.event.is_some()
1650                    || !state.get_mut(set)?.ready.is_empty()
1651                {
1652                    // An event is immediately available; deliver it ASAP.
1653                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1654                        thread: guest_thread,
1655                        kind: GuestCallKind::DeliverEvent {
1656                            instance: self,
1657                            set: Some(set),
1658                        },
1659                    }));
1660                } else {
1661                    // No event is immediately available.
1662                    match code {
1663                        callback_code::POLL => {
1664                            // We're polling, so just yield and check whether an
1665                            // event has arrived after that.
1666                            state.push_low_priority(WorkItem::Poll(PollParams {
1667                                instance: self,
1668                                thread: guest_thread,
1669                                set,
1670                            }));
1671                        }
1672                        callback_code::WAIT => {
1673                            // We're waiting, so register to be woken up when an
1674                            // event is published for this waitable set.
1675                            //
1676                            // Here we also set `GuestTask::wake_on_cancel`
1677                            // which allows `subtask.cancel` to interrupt the
1678                            // wait.
1679                            let old = state
1680                                .get_mut(guest_thread.thread)?
1681                                .wake_on_cancel
1682                                .replace(set);
1683                            assert!(old.is_none());
1684                            let old = state
1685                                .get_mut(set)?
1686                                .waiting
1687                                .insert(guest_thread, WaitMode::Callback(self));
1688                            assert!(old.is_none());
1689                        }
1690                        _ => unreachable!(),
1691                    }
1692                }
1693            }
1694            _ => bail!("unsupported callback code: {code}"),
1695        }
1696
1697        Ok(())
1698    }
1699
1700    fn cleanup_thread(
1701        self,
1702        store: &mut StoreOpaque,
1703        guest_thread: QualifiedThreadId,
1704        runtime_instance: RuntimeComponentInstanceIndex,
1705    ) -> Result<()> {
1706        let guest_id = store
1707            .concurrent_state_mut()
1708            .get_mut(guest_thread.thread)?
1709            .instance_rep;
1710        self.id().get_mut(store).guest_tables().0[runtime_instance]
1711            .guest_thread_remove(guest_id.unwrap())?;
1712
1713        store.concurrent_state_mut().delete(guest_thread.thread)?;
1714        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715        task.threads.remove(&guest_thread.thread);
1716        Ok(())
1717    }
1718
1719    /// Add the specified guest call to the "high priority" work item queue, to
1720    /// be started as soon as backpressure and/or reentrance rules allow.
1721    ///
1722    /// SAFETY: The raw pointer arguments must be valid references to guest
1723    /// functions (with the appropriate signatures) when the closures queued by
1724    /// this function are called.
1725    unsafe fn queue_call<T: 'static>(
1726        self,
1727        mut store: StoreContextMut<T>,
1728        guest_thread: QualifiedThreadId,
1729        callee: SendSyncPtr<VMFuncRef>,
1730        param_count: usize,
1731        result_count: usize,
1732        flags: Option<InstanceFlags>,
1733        async_: bool,
1734        callback: Option<SendSyncPtr<VMFuncRef>>,
1735        post_return: Option<SendSyncPtr<VMFuncRef>>,
1736    ) -> Result<()> {
1737        /// Return a closure which will call the specified function in the scope
1738        /// of the specified task.
1739        ///
1740        /// This will use `GuestTask::lower_params` to lower the parameters, but
1741        /// will not lift the result; instead, it returns a
1742        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1743        /// any, may be lifted.  Note that an async-lifted export will have
1744        /// returned its result using the `task.return` intrinsic (or not
1745        /// returned a result at all, in the case of `task.cancel`), in which
1746        /// case the "result" of this call will either be a callback code or
1747        /// nothing.
1748        ///
1749        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1750        /// the returned closure is called.
1751        unsafe fn make_call<T: 'static>(
1752            store: StoreContextMut<T>,
1753            guest_thread: QualifiedThreadId,
1754            callee: SendSyncPtr<VMFuncRef>,
1755            param_count: usize,
1756            result_count: usize,
1757            flags: Option<InstanceFlags>,
1758        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1759        + Send
1760        + Sync
1761        + 'static
1762        + use<T> {
1763            let token = StoreToken::new(store);
1764            move |store: &mut dyn VMStore| {
1765                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1766
1767                store
1768                    .concurrent_state_mut()
1769                    .get_mut(guest_thread.thread)?
1770                    .state = GuestThreadState::Running;
1771                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1772                let may_enter_after_call = task.call_post_return_automatically();
1773                let lower = task.lower_params.take().unwrap();
1774
1775                lower(store, &mut storage[..param_count])?;
1776
1777                let mut store = token.as_context_mut(store);
1778
1779                // SAFETY: Per the contract documented in `make_call's`
1780                // documentation, `callee` must be a valid pointer.
1781                unsafe {
1782                    if let Some(mut flags) = flags {
1783                        flags.set_may_enter(false);
1784                    }
1785                    crate::Func::call_unchecked_raw(
1786                        &mut store,
1787                        callee.as_non_null(),
1788                        NonNull::new(
1789                            &mut storage[..param_count.max(result_count)]
1790                                as *mut [MaybeUninit<ValRaw>] as _,
1791                        )
1792                        .unwrap(),
1793                    )?;
1794                    if let Some(mut flags) = flags {
1795                        flags.set_may_enter(may_enter_after_call);
1796                    }
1797                }
1798
1799                Ok(storage)
1800            }
1801        }
1802
1803        // SAFETY: Per the contract described in this function documentation,
1804        // the `callee` pointer which `call` closes over must be valid when
1805        // called by the closure we queue below.
1806        let call = unsafe {
1807            make_call(
1808                store.as_context_mut(),
1809                guest_thread,
1810                callee,
1811                param_count,
1812                result_count,
1813                flags,
1814            )
1815        };
1816
1817        let callee_instance = store
1818            .0
1819            .concurrent_state_mut()
1820            .get_mut(guest_thread.task)?
1821            .instance;
1822        let fun = if callback.is_some() {
1823            assert!(async_);
1824
1825            Box::new(move |store: &mut dyn VMStore| {
1826                self.add_guest_thread_to_instance_table(
1827                    guest_thread.thread,
1828                    store,
1829                    callee_instance,
1830                )?;
1831                let old_thread = store
1832                    .concurrent_state_mut()
1833                    .guest_thread
1834                    .replace(guest_thread);
1835                log::trace!(
1836                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1837                );
1838
1839                store.maybe_push_call_context(guest_thread.task)?;
1840
1841                store.concurrent_state_mut().enter_instance(callee_instance);
1842
1843                // SAFETY: See the documentation for `make_call` to review the
1844                // contract we must uphold for `call` here.
1845                //
1846                // Per the contract described in the `queue_call`
1847                // documentation, the `callee` pointer which `call` closes
1848                // over must be valid.
1849                let storage = call(store)?;
1850
1851                store
1852                    .concurrent_state_mut()
1853                    .exit_instance(callee_instance)?;
1854
1855                store.maybe_pop_call_context(guest_thread.task)?;
1856
1857                let state = store.concurrent_state_mut();
1858                state.guest_thread = old_thread;
1859                old_thread
1860                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1861                log::trace!("stackless call: restored {old_thread:?} as current thread");
1862
1863                // SAFETY: `wasmparser` will have validated that the callback
1864                // function returns a `i32` result.
1865                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1866
1867                self.handle_callback_code(store, guest_thread, callee_instance, code)?;
1868
1869                Ok(())
1870            }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1871        } else {
1872            let token = StoreToken::new(store.as_context_mut());
1873            Box::new(move |store: &mut dyn VMStore| {
1874                self.add_guest_thread_to_instance_table(
1875                    guest_thread.thread,
1876                    store,
1877                    callee_instance,
1878                )?;
1879                let old_thread = store
1880                    .concurrent_state_mut()
1881                    .guest_thread
1882                    .replace(guest_thread);
1883                log::trace!(
1884                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1885                );
1886                let mut flags = self.id().get(store).instance_flags(callee_instance);
1887
1888                store.maybe_push_call_context(guest_thread.task)?;
1889
1890                // Unless this is a callback-less (i.e. stackful)
1891                // async-lifted export, we need to record that the instance
1892                // cannot be entered until the call returns.
1893                if !async_ {
1894                    store.concurrent_state_mut().enter_instance(callee_instance);
1895                }
1896
1897                // SAFETY: See the documentation for `make_call` to review the
1898                // contract we must uphold for `call` here.
1899                //
1900                // Per the contract described in the `queue_call`
1901                // documentation, the `callee` pointer which `call` closes
1902                // over must be valid.
1903                let storage = call(store)?;
1904
1905                // This is a callback-less call, so the implicit thread has now completed
1906                self.cleanup_thread(store, guest_thread, callee_instance)?;
1907
1908                if async_ {
1909                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1910                    if task.threads.is_empty() && !task.returned_or_cancelled() {
1911                        bail!(Trap::NoAsyncResult);
1912                    }
1913                } else {
1914                    // This is a sync-lifted export, so now is when we lift the
1915                    // result, optionally call the post-return function, if any,
1916                    // and finally notify any current or future waiters that the
1917                    // subtask has returned.
1918
1919                    let lift = {
1920                        let state = store.concurrent_state_mut();
1921                        state.exit_instance(callee_instance)?;
1922
1923                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
1924
1925                        state
1926                            .get_mut(guest_thread.task)?
1927                            .lift_result
1928                            .take()
1929                            .unwrap()
1930                    };
1931
1932                    // SAFETY: `result_count` represents the number of core Wasm
1933                    // results returned, per `wasmparser`.
1934                    let result = (lift.lift)(store, unsafe {
1935                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1936                            &storage[..result_count],
1937                        )
1938                    })?;
1939
1940                    let post_return_arg = match result_count {
1941                        0 => ValRaw::i32(0),
1942                        // SAFETY: `result_count` represents the number of
1943                        // core Wasm results returned, per `wasmparser`.
1944                        1 => unsafe { storage[0].assume_init() },
1945                        _ => unreachable!(),
1946                    };
1947
1948                    if store
1949                        .concurrent_state_mut()
1950                        .get_mut(guest_thread.task)?
1951                        .call_post_return_automatically()
1952                    {
1953                        unsafe {
1954                            flags.set_may_leave(false);
1955                            flags.set_needs_post_return(false);
1956                        }
1957
1958                        if let Some(func) = post_return {
1959                            let mut store = token.as_context_mut(store);
1960
1961                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1962                            // either `wasmtime-cranelift`-generated fused adapter
1963                            // code or `component::Options`.  Per `wasmparser`
1964                            // post-return signature validation, we know it takes a
1965                            // single parameter.
1966                            unsafe {
1967                                crate::Func::call_unchecked_raw(
1968                                    &mut store,
1969                                    func.as_non_null(),
1970                                    slice::from_ref(&post_return_arg).into(),
1971                                )?;
1972                            }
1973                        }
1974
1975                        unsafe {
1976                            flags.set_may_leave(true);
1977                            flags.set_may_enter(true);
1978                        }
1979                    }
1980
1981                    self.task_complete(
1982                        store,
1983                        guest_thread.task,
1984                        result,
1985                        Status::Returned,
1986                        post_return_arg,
1987                    )?;
1988                }
1989
1990                store.maybe_pop_call_context(guest_thread.task)?;
1991
1992                let state = store.concurrent_state_mut();
1993                let task = state.get_mut(guest_thread.task)?;
1994
1995                match &task.caller {
1996                    Caller::Host { .. } => {
1997                        if task.ready_to_delete() {
1998                            Waitable::Guest(guest_thread.task).delete_from(state)?;
1999                        }
2000                    }
2001                    Caller::Guest { .. } => {
2002                        task.exited = true;
2003                    }
2004                }
2005
2006                Ok(())
2007            })
2008        };
2009
2010        store
2011            .0
2012            .concurrent_state_mut()
2013            .push_high_priority(WorkItem::GuestCall(GuestCall {
2014                thread: guest_thread,
2015                kind: GuestCallKind::StartImplicit(fun),
2016            }));
2017
2018        Ok(())
2019    }
2020
2021    /// Prepare (but do not start) a guest->guest call.
2022    ///
2023    /// This is called from fused adapter code generated in
2024    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2025    /// are synthesized Wasm functions which move the parameters from the caller
2026    /// to the callee and the result from the callee to the caller,
2027    /// respectively.  The adapter will call `Self::start_call` immediately
2028    /// after calling this function.
2029    ///
2030    /// SAFETY: All the pointer arguments must be valid pointers to guest
2031    /// entities (and with the expected signatures for the function references
2032    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2033    unsafe fn prepare_call<T: 'static>(
2034        self,
2035        mut store: StoreContextMut<T>,
2036        start: *mut VMFuncRef,
2037        return_: *mut VMFuncRef,
2038        caller_instance: RuntimeComponentInstanceIndex,
2039        callee_instance: RuntimeComponentInstanceIndex,
2040        task_return_type: TypeTupleIndex,
2041        memory: *mut VMMemoryDefinition,
2042        string_encoding: u8,
2043        caller_info: CallerInfo,
2044    ) -> Result<()> {
2045        self.id().get(store.0).check_may_leave(caller_instance)?;
2046
2047        enum ResultInfo {
2048            Heap { results: u32 },
2049            Stack { result_count: u32 },
2050        }
2051
2052        let result_info = match &caller_info {
2053            CallerInfo::Async {
2054                has_result: true,
2055                params,
2056            } => ResultInfo::Heap {
2057                results: params.last().unwrap().get_u32(),
2058            },
2059            CallerInfo::Async {
2060                has_result: false, ..
2061            } => ResultInfo::Stack { result_count: 0 },
2062            CallerInfo::Sync {
2063                result_count,
2064                params,
2065            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2066                results: params.last().unwrap().get_u32(),
2067            },
2068            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2069                result_count: *result_count,
2070            },
2071        };
2072
2073        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2074
2075        // Create a new guest task for the call, closing over the `start` and
2076        // `return_` functions to lift the parameters and lower the result,
2077        // respectively.
2078        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2079        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2080        let token = StoreToken::new(store.as_context_mut());
2081        let state = store.0.concurrent_state_mut();
2082        let old_thread = state.guest_thread.take();
2083        let new_task = GuestTask::new(
2084            state,
2085            Box::new(move |store, dst| {
2086                let mut store = token.as_context_mut(store);
2087                assert!(dst.len() <= MAX_FLAT_PARAMS);
2088                // The `+ 1` here accounts for the return pointer, if any:
2089                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2090                let count = match caller_info {
2091                    // Async callers, if they have a result, use the last
2092                    // parameter as a return pointer so chop that off if
2093                    // relevant here.
2094                    CallerInfo::Async { params, has_result } => {
2095                        let params = &params[..params.len() - usize::from(has_result)];
2096                        for (param, src) in params.iter().zip(&mut src) {
2097                            src.write(*param);
2098                        }
2099                        params.len()
2100                    }
2101
2102                    // Sync callers forward everything directly.
2103                    CallerInfo::Sync { params, .. } => {
2104                        for (param, src) in params.iter().zip(&mut src) {
2105                            src.write(*param);
2106                        }
2107                        params.len()
2108                    }
2109                };
2110                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2111                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2112                // how it was constructed (see
2113                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2114                // for details) we know it takes count parameters and returns
2115                // `dst.len()` results.
2116                unsafe {
2117                    crate::Func::call_unchecked_raw(
2118                        &mut store,
2119                        start.as_non_null(),
2120                        NonNull::new(
2121                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2122                        )
2123                        .unwrap(),
2124                    )?;
2125                }
2126                dst.copy_from_slice(&src[..dst.len()]);
2127                let state = store.0.concurrent_state_mut();
2128                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2129                    state,
2130                    Some(Event::Subtask {
2131                        status: Status::Started,
2132                    }),
2133                )?;
2134                Ok(())
2135            }),
2136            LiftResult {
2137                lift: Box::new(move |store, src| {
2138                    // SAFETY: See comment in closure passed as `lower_params`
2139                    // parameter above.
2140                    let mut store = token.as_context_mut(store);
2141                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2142                    if let ResultInfo::Heap { results } = &result_info {
2143                        my_src.push(ValRaw::u32(*results));
2144                    }
2145                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2146                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2147                    // on how it was constructed (see
2148                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2149                    // for details) we know it takes `src.len()` parameters and
2150                    // returns up to 1 result.
2151                    unsafe {
2152                        crate::Func::call_unchecked_raw(
2153                            &mut store,
2154                            return_.as_non_null(),
2155                            my_src.as_mut_slice().into(),
2156                        )?;
2157                    }
2158                    let state = store.0.concurrent_state_mut();
2159                    let thread = state.guest_thread.unwrap();
2160                    if sync_caller {
2161                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2162                            if let ResultInfo::Stack { result_count } = &result_info {
2163                                match result_count {
2164                                    0 => None,
2165                                    1 => Some(my_src[0]),
2166                                    _ => unreachable!(),
2167                                }
2168                            } else {
2169                                None
2170                            },
2171                        );
2172                    }
2173                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2174                }),
2175                ty: task_return_type,
2176                memory: NonNull::new(memory).map(SendSyncPtr::new),
2177                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2178            },
2179            Caller::Guest {
2180                thread: old_thread.unwrap(),
2181                instance: caller_instance,
2182            },
2183            None,
2184            callee_instance,
2185        )?;
2186
2187        let guest_task = state.push(new_task)?;
2188        let new_thread = GuestThread::new_implicit(guest_task);
2189        let guest_thread = state.push(new_thread)?;
2190        state.get_mut(guest_task)?.threads.insert(guest_thread);
2191
2192        let state = store.0.concurrent_state_mut();
2193        if let Some(old_thread) = old_thread {
2194            if !state.may_enter(guest_task) {
2195                bail!(crate::Trap::CannotEnterComponent);
2196            }
2197
2198            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2199        };
2200
2201        // Make the new thread the current one so that `Self::start_call` knows
2202        // which one to start.
2203        state.guest_thread = Some(QualifiedThreadId {
2204            task: guest_task,
2205            thread: guest_thread,
2206        });
2207        log::trace!(
2208            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2209        );
2210
2211        Ok(())
2212    }
2213
2214    /// Call the specified callback function for an async-lifted export.
2215    ///
2216    /// SAFETY: `function` must be a valid reference to a guest function of the
2217    /// correct signature for a callback.
2218    unsafe fn call_callback<T>(
2219        self,
2220        mut store: StoreContextMut<T>,
2221        callee_instance: RuntimeComponentInstanceIndex,
2222        function: SendSyncPtr<VMFuncRef>,
2223        event: Event,
2224        handle: u32,
2225        may_enter_after_call: bool,
2226    ) -> Result<u32> {
2227        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2228
2229        let (ordinal, result) = event.parts();
2230        let params = &mut [
2231            ValRaw::u32(ordinal),
2232            ValRaw::u32(handle),
2233            ValRaw::u32(result),
2234        ];
2235        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2236        // `wasmtime-cranelift`-generated fused adapter code or
2237        // `component::Options`.  Per `wasmparser` callback signature
2238        // validation, we know it takes three parameters and returns one.
2239        unsafe {
2240            flags.set_may_enter(false);
2241            crate::Func::call_unchecked_raw(
2242                &mut store,
2243                function.as_non_null(),
2244                params.as_mut_slice().into(),
2245            )?;
2246            flags.set_may_enter(may_enter_after_call);
2247        }
2248        Ok(params[0].get_u32())
2249    }
2250
2251    /// Start a guest->guest call previously prepared using
2252    /// `Self::prepare_call`.
2253    ///
2254    /// This is called from fused adapter code generated in
2255    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2256    /// this function immediately after calling `Self::prepare_call`.
2257    ///
2258    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2259    /// functions with the appropriate signatures for the current guest task.
2260    /// If this is a call to an async-lowered import, the actual call may be
2261    /// deferred and run after this function returns, in which case the pointer
2262    /// arguments must also be valid when the call happens.
2263    unsafe fn start_call<T: 'static>(
2264        self,
2265        mut store: StoreContextMut<T>,
2266        callback: *mut VMFuncRef,
2267        post_return: *mut VMFuncRef,
2268        callee: *mut VMFuncRef,
2269        param_count: u32,
2270        result_count: u32,
2271        flags: u32,
2272        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2273    ) -> Result<u32> {
2274        let token = StoreToken::new(store.as_context_mut());
2275        let async_caller = storage.is_none();
2276        let state = store.0.concurrent_state_mut();
2277        let guest_thread = state.guest_thread.unwrap();
2278        let may_enter_after_call = state
2279            .get_mut(guest_thread.task)?
2280            .call_post_return_automatically();
2281        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2282        let param_count = usize::try_from(param_count).unwrap();
2283        assert!(param_count <= MAX_FLAT_PARAMS);
2284        let result_count = usize::try_from(result_count).unwrap();
2285        assert!(result_count <= MAX_FLAT_RESULTS);
2286
2287        let task = state.get_mut(guest_thread.task)?;
2288        if !callback.is_null() {
2289            // We're calling an async-lifted export with a callback, so store
2290            // the callback and related context as part of the task so we can
2291            // call it later when needed.
2292            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2293            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2294                let store = token.as_context_mut(store);
2295                unsafe {
2296                    self.call_callback::<T>(
2297                        store,
2298                        runtime_instance,
2299                        callback,
2300                        event,
2301                        handle,
2302                        may_enter_after_call,
2303                    )
2304                }
2305            }));
2306        }
2307
2308        let Caller::Guest {
2309            thread: caller,
2310            instance: runtime_instance,
2311        } = &task.caller
2312        else {
2313            // As of this writing, `start_call` is only used for guest->guest
2314            // calls.
2315            unreachable!()
2316        };
2317        let caller = *caller;
2318        let caller_instance = *runtime_instance;
2319
2320        let callee_instance = task.instance;
2321
2322        let instance_flags = if callback.is_null() {
2323            None
2324        } else {
2325            Some(self.id().get(store.0).instance_flags(callee_instance))
2326        };
2327
2328        // Queue the call as a "high priority" work item.
2329        unsafe {
2330            self.queue_call(
2331                store.as_context_mut(),
2332                guest_thread,
2333                callee,
2334                param_count,
2335                result_count,
2336                instance_flags,
2337                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2338                NonNull::new(callback).map(SendSyncPtr::new),
2339                NonNull::new(post_return).map(SendSyncPtr::new),
2340            )?;
2341        }
2342
2343        let state = store.0.concurrent_state_mut();
2344
2345        // Use the caller's `GuestTask::sync_call_set` to register interest in
2346        // the subtask...
2347        let guest_waitable = Waitable::Guest(guest_thread.task);
2348        let old_set = guest_waitable.common(state)?.set;
2349        let set = state.get_mut(caller.task)?.sync_call_set;
2350        guest_waitable.join(state, Some(set))?;
2351
2352        // ... and suspend this fiber temporarily while we wait for it to start.
2353        //
2354        // Note that we _could_ call the callee directly using the current fiber
2355        // rather than suspend this one, but that would make reasoning about the
2356        // event loop more complicated and is probably only worth doing if
2357        // there's a measurable performance benefit.  In addition, it would mean
2358        // blocking the caller if the callee calls a blocking sync-lowered
2359        // import, and as of this writing the spec says we must not do that.
2360        //
2361        // Alternatively, the fused adapter code could be modified to call the
2362        // callee directly without calling a host-provided intrinsic at all (in
2363        // which case it would need to do its own, inline backpressure checks,
2364        // etc.).  Again, we'd want to see a measurable performance benefit
2365        // before committing to such an optimization.  And again, we'd need to
2366        // update the spec to allow that.
2367        let (status, waitable) = loop {
2368            store.0.suspend(SuspendReason::Waiting {
2369                set,
2370                thread: caller,
2371            })?;
2372
2373            let state = store.0.concurrent_state_mut();
2374
2375            log::trace!("taking event for {:?}", guest_thread.task);
2376            let event = guest_waitable.take_event(state)?;
2377            let Some(Event::Subtask { status }) = event else {
2378                unreachable!();
2379            };
2380
2381            log::trace!("status {status:?} for {:?}", guest_thread.task);
2382
2383            if status == Status::Returned {
2384                // It returned, so we can stop waiting.
2385                break (status, None);
2386            } else if async_caller {
2387                // It hasn't returned yet, but the caller is calling via an
2388                // async-lowered import, so we generate a handle for the task
2389                // waitable and return the status.
2390                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2391                    .subtask_insert_guest(guest_thread.task.rep())?;
2392                store
2393                    .0
2394                    .concurrent_state_mut()
2395                    .get_mut(guest_thread.task)?
2396                    .common
2397                    .handle = Some(handle);
2398                break (status, Some(handle));
2399            } else {
2400                // The callee hasn't returned yet, and the caller is calling via
2401                // a sync-lowered import, so we loop and keep waiting until the
2402                // callee returns.
2403            }
2404        };
2405
2406        let state = store.0.concurrent_state_mut();
2407
2408        guest_waitable.join(state, old_set)?;
2409
2410        if let Some(storage) = storage {
2411            // The caller used a sync-lowered import to call an async-lifted
2412            // export, in which case the result, if any, has been stashed in
2413            // `GuestTask::sync_result`.
2414            let task = state.get_mut(guest_thread.task)?;
2415            if let Some(result) = task.sync_result.take() {
2416                if let Some(result) = result {
2417                    storage[0] = MaybeUninit::new(result);
2418                }
2419
2420                if task.exited {
2421                    if task.ready_to_delete() {
2422                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2423                    }
2424                }
2425            }
2426        }
2427
2428        // Reset the current thread to point to the caller as it resumes control.
2429        state.guest_thread = Some(caller);
2430        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2431        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2432
2433        Ok(status.pack(waitable))
2434    }
2435
2436    /// Poll the specified future once on behalf of a guest->host call using an
2437    /// async-lowered import.
2438    ///
2439    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2440    /// `Pending`, add it to the set of futures to be polled as part of this
2441    /// instance's event loop until it completes, and then return
2442    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2443    ///
2444    /// Whether the future returns `Ready` immediately or later, the `lower`
2445    /// function will be used to lower the result, if any, into the guest caller's
2446    /// stack and linear memory unless the task has been cancelled.
2447    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2448        self,
2449        mut store: StoreContextMut<T>,
2450        future: impl Future<Output = Result<R>> + Send + 'static,
2451        caller_instance: RuntimeComponentInstanceIndex,
2452        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2453    ) -> Result<Option<u32>> {
2454        let token = StoreToken::new(store.as_context_mut());
2455        let state = store.0.concurrent_state_mut();
2456        let caller = state.guest_thread.unwrap();
2457
2458        // Create an abortable future which hooks calls to poll and manages call
2459        // context state for the future.
2460        let (join_handle, future) = JoinHandle::run(async move {
2461            let mut future = pin!(future);
2462            let mut call_context = None;
2463            future::poll_fn(move |cx| {
2464                // Push the call context for managing any resource borrows
2465                // for the task.
2466                tls::get(|store| {
2467                    if let Some(call_context) = call_context.take() {
2468                        token
2469                            .as_context_mut(store)
2470                            .0
2471                            .component_resource_state()
2472                            .0
2473                            .push(call_context);
2474                    }
2475                });
2476
2477                let result = future.as_mut().poll(cx);
2478
2479                if result.is_pending() {
2480                    // Pop the call context for managing any resource
2481                    // borrows for the task.
2482                    tls::get(|store| {
2483                        call_context = Some(
2484                            token
2485                                .as_context_mut(store)
2486                                .0
2487                                .component_resource_state()
2488                                .0
2489                                .pop()
2490                                .unwrap(),
2491                        );
2492                    });
2493                }
2494                result
2495            })
2496            .await
2497        });
2498
2499        // We create a new host task even though it might complete immediately
2500        // (in which case we won't need to pass a waitable back to the guest).
2501        // If it does complete immediately, we'll remove it before we return.
2502        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2503
2504        log::trace!("new host task child of {caller:?}: {task:?}");
2505
2506        let mut future = Box::pin(future);
2507
2508        // Finally, poll the future.  We can use a dummy `Waker` here because
2509        // we'll add the future to `ConcurrentState::futures` and poll it
2510        // automatically from the event loop if it doesn't complete immediately
2511        // here.
2512        let poll = tls::set(store.0, || {
2513            future
2514                .as_mut()
2515                .poll(&mut Context::from_waker(&Waker::noop()))
2516        });
2517
2518        Ok(match poll {
2519            Poll::Ready(None) => unreachable!(),
2520            Poll::Ready(Some(result)) => {
2521                // It finished immediately; lower the result and delete the
2522                // task.
2523                lower(store.as_context_mut(), result?)?;
2524                log::trace!("delete host task {task:?} (already ready)");
2525                store.0.concurrent_state_mut().delete(task)?;
2526                None
2527            }
2528            Poll::Pending => {
2529                // It hasn't finished yet; add the future to
2530                // `ConcurrentState::futures` so it will be polled by the event
2531                // loop and allocate a waitable handle to return to the guest.
2532
2533                // Wrap the future in a closure responsible for lowering the result into
2534                // the guest's stack and memory, as well as notifying any waiters that
2535                // the task returned.
2536                let future =
2537                    Box::pin(async move {
2538                        let result = match future.await {
2539                            Some(result) => result?,
2540                            // Task was cancelled; nothing left to do.
2541                            None => return Ok(()),
2542                        };
2543                        tls::get(move |store| {
2544                            // Here we schedule a task to run on a worker fiber to do
2545                            // the lowering since it may involve a call to the guest's
2546                            // realloc function.  This is necessary because calling the
2547                            // guest while there are host embedder frames on the stack
2548                            // is unsound.
2549                            store.concurrent_state_mut().push_high_priority(
2550                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2551                                    lower(token.as_context_mut(store), result)?;
2552                                    let state = store.concurrent_state_mut();
2553                                    state.get_mut(task)?.join_handle.take();
2554                                    Waitable::Host(task).set_event(
2555                                        state,
2556                                        Some(Event::Subtask {
2557                                            status: Status::Returned,
2558                                        }),
2559                                    )
2560                                }))),
2561                            );
2562                            Ok(())
2563                        })
2564                    });
2565
2566                store.0.concurrent_state_mut().push_future(future);
2567                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2568                    .subtask_insert_host(task.rep())?;
2569                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2570                log::trace!(
2571                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2572                );
2573                Some(handle)
2574            }
2575        })
2576    }
2577
2578    /// Implements the `task.return` intrinsic, lifting the result for the
2579    /// current guest task.
2580    pub(crate) fn task_return(
2581        self,
2582        store: &mut dyn VMStore,
2583        caller: RuntimeComponentInstanceIndex,
2584        ty: TypeTupleIndex,
2585        options: OptionsIndex,
2586        storage: &[ValRaw],
2587    ) -> Result<()> {
2588        self.id().get(store).check_may_leave(caller)?;
2589        let state = store.concurrent_state_mut();
2590        let guest_thread = state.guest_thread.unwrap();
2591        let lift = state
2592            .get_mut(guest_thread.task)?
2593            .lift_result
2594            .take()
2595            .ok_or_else(|| {
2596                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2597            })?;
2598        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2599
2600        let CanonicalOptions {
2601            string_encoding,
2602            data_model,
2603            ..
2604        } = &self.id().get(store).component().env_component().options[options];
2605
2606        let invalid = ty != lift.ty
2607            || string_encoding != &lift.string_encoding
2608            || match data_model {
2609                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2610                    Some(memory) => {
2611                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2612                        let actual = self.id().get(store).runtime_memory(memory);
2613                        expected != actual.as_ptr()
2614                    }
2615                    // Memory not specified, meaning it didn't need to be
2616                    // specified per validation, so not invalid.
2617                    None => false,
2618                },
2619                // Always invalid as this isn't supported.
2620                CanonicalOptionsDataModel::Gc { .. } => true,
2621            };
2622
2623        if invalid {
2624            bail!("invalid `task.return` signature and/or options for current task");
2625        }
2626
2627        log::trace!("task.return for {guest_thread:?}");
2628
2629        let result = (lift.lift)(store, storage)?;
2630        self.task_complete(
2631            store,
2632            guest_thread.task,
2633            result,
2634            Status::Returned,
2635            ValRaw::i32(0),
2636        )
2637    }
2638
2639    /// Implements the `task.cancel` intrinsic.
2640    pub(crate) fn task_cancel(
2641        self,
2642        store: &mut StoreOpaque,
2643        caller: RuntimeComponentInstanceIndex,
2644    ) -> Result<()> {
2645        self.id().get(store).check_may_leave(caller)?;
2646        let state = store.concurrent_state_mut();
2647        let guest_thread = state.guest_thread.unwrap();
2648        let task = state.get_mut(guest_thread.task)?;
2649        if !task.cancel_sent {
2650            bail!("`task.cancel` called by task which has not been cancelled")
2651        }
2652        _ = task.lift_result.take().ok_or_else(|| {
2653            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2654        })?;
2655
2656        assert!(task.result.is_none());
2657
2658        log::trace!("task.cancel for {guest_thread:?}");
2659
2660        self.task_complete(
2661            store,
2662            guest_thread.task,
2663            Box::new(DummyResult),
2664            Status::ReturnCancelled,
2665            ValRaw::i32(0),
2666        )
2667    }
2668
2669    /// Complete the specified guest task (i.e. indicate that it has either
2670    /// returned a (possibly empty) result or cancelled itself).
2671    ///
2672    /// This will return any resource borrows and notify any current or future
2673    /// waiters that the task has completed.
2674    fn task_complete(
2675        self,
2676        store: &mut StoreOpaque,
2677        guest_task: TableId<GuestTask>,
2678        result: Box<dyn Any + Send + Sync>,
2679        status: Status,
2680        post_return_arg: ValRaw,
2681    ) -> Result<()> {
2682        if store
2683            .concurrent_state_mut()
2684            .get_mut(guest_task)?
2685            .call_post_return_automatically()
2686        {
2687            let (calls, host_table, _, instance) =
2688                store.component_resource_state_with_instance(self);
2689            ResourceTables {
2690                calls,
2691                host_table: Some(host_table),
2692                guest: Some(instance.guest_tables()),
2693            }
2694            .exit_call()?;
2695        } else {
2696            // As of this writing, the only scenario where `call_post_return_automatically`
2697            // would be false for a `GuestTask` is for host-to-guest calls using
2698            // `[Typed]Func::call_async`, in which case the `function_index`
2699            // should be a non-`None` value.
2700            let function_index = store
2701                .concurrent_state_mut()
2702                .get_mut(guest_task)?
2703                .function_index
2704                .unwrap();
2705            self.id()
2706                .get_mut(store)
2707                .post_return_arg_set(function_index, post_return_arg);
2708        }
2709
2710        let state = store.concurrent_state_mut();
2711        let task = state.get_mut(guest_task)?;
2712
2713        if let Caller::Host { tx, .. } = &mut task.caller {
2714            if let Some(tx) = tx.take() {
2715                _ = tx.send(result);
2716            }
2717        } else {
2718            task.result = Some(result);
2719            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2720        }
2721
2722        Ok(())
2723    }
2724
2725    /// Implements the `waitable-set.new` intrinsic.
2726    pub(crate) fn waitable_set_new(
2727        self,
2728        store: &mut StoreOpaque,
2729        caller_instance: RuntimeComponentInstanceIndex,
2730    ) -> Result<u32> {
2731        self.id().get_mut(store).check_may_leave(caller_instance)?;
2732        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2733        let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2734            .waitable_set_insert(set.rep())?;
2735        log::trace!("new waitable set {set:?} (handle {handle})");
2736        Ok(handle)
2737    }
2738
2739    /// Implements the `waitable-set.drop` intrinsic.
2740    pub(crate) fn waitable_set_drop(
2741        self,
2742        store: &mut StoreOpaque,
2743        caller_instance: RuntimeComponentInstanceIndex,
2744        set: u32,
2745    ) -> Result<()> {
2746        self.id().get_mut(store).check_may_leave(caller_instance)?;
2747        let rep =
2748            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2749
2750        log::trace!("drop waitable set {rep} (handle {set})");
2751
2752        let set = store
2753            .concurrent_state_mut()
2754            .delete(TableId::<WaitableSet>::new(rep))?;
2755
2756        if !set.waiting.is_empty() {
2757            bail!("cannot drop waitable set with waiters");
2758        }
2759
2760        Ok(())
2761    }
2762
2763    /// Implements the `waitable.join` intrinsic.
2764    pub(crate) fn waitable_join(
2765        self,
2766        store: &mut StoreOpaque,
2767        caller_instance: RuntimeComponentInstanceIndex,
2768        waitable_handle: u32,
2769        set_handle: u32,
2770    ) -> Result<()> {
2771        let mut instance = self.id().get_mut(store);
2772        instance.check_may_leave(caller_instance)?;
2773        let waitable =
2774            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2775
2776        let set = if set_handle == 0 {
2777            None
2778        } else {
2779            let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2780
2781            Some(TableId::<WaitableSet>::new(set))
2782        };
2783
2784        log::trace!(
2785            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2786        );
2787
2788        waitable.join(store.concurrent_state_mut(), set)
2789    }
2790
2791    /// Implements the `subtask.drop` intrinsic.
2792    pub(crate) fn subtask_drop(
2793        self,
2794        store: &mut StoreOpaque,
2795        caller_instance: RuntimeComponentInstanceIndex,
2796        task_id: u32,
2797    ) -> Result<()> {
2798        self.id().get_mut(store).check_may_leave(caller_instance)?;
2799        self.waitable_join(store, caller_instance, task_id, 0)?;
2800
2801        let (rep, is_host) =
2802            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2803
2804        let concurrent_state = store.concurrent_state_mut();
2805        let (waitable, expected_caller_instance, delete) = if is_host {
2806            let id = TableId::<HostTask>::new(rep);
2807            let task = concurrent_state.get_mut(id)?;
2808            if task.join_handle.is_some() {
2809                bail!("cannot drop a subtask which has not yet resolved");
2810            }
2811            (Waitable::Host(id), task.caller_instance, true)
2812        } else {
2813            let id = TableId::<GuestTask>::new(rep);
2814            let task = concurrent_state.get_mut(id)?;
2815            if task.lift_result.is_some() {
2816                bail!("cannot drop a subtask which has not yet resolved");
2817            }
2818            if let Caller::Guest { instance, .. } = &task.caller {
2819                (Waitable::Guest(id), *instance, task.exited)
2820            } else {
2821                unreachable!()
2822            }
2823        };
2824
2825        waitable.common(concurrent_state)?.handle = None;
2826
2827        if waitable.take_event(concurrent_state)?.is_some() {
2828            bail!("cannot drop a subtask with an undelivered event");
2829        }
2830
2831        if delete {
2832            waitable.delete_from(concurrent_state)?;
2833        }
2834
2835        // Since waitables can neither be passed between instances nor forged,
2836        // this should never fail unless there's a bug in Wasmtime, but we check
2837        // here to be sure:
2838        assert_eq!(expected_caller_instance, caller_instance);
2839        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2840        Ok(())
2841    }
2842
2843    /// Implements the `waitable-set.wait` intrinsic.
2844    pub(crate) fn waitable_set_wait(
2845        self,
2846        store: &mut StoreOpaque,
2847        caller: RuntimeComponentInstanceIndex,
2848        options: OptionsIndex,
2849        set: u32,
2850        payload: u32,
2851    ) -> Result<u32> {
2852        self.id().get(store).check_may_leave(caller)?;
2853        let &CanonicalOptions {
2854            cancellable,
2855            instance: caller_instance,
2856            ..
2857        } = &self.id().get(store).component().env_component().options[options];
2858        let rep =
2859            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2860
2861        self.waitable_check(
2862            store,
2863            cancellable,
2864            WaitableCheck::Wait(WaitableCheckParams {
2865                set: TableId::new(rep),
2866                options,
2867                payload,
2868            }),
2869        )
2870    }
2871
2872    /// Implements the `waitable-set.poll` intrinsic.
2873    pub(crate) fn waitable_set_poll(
2874        self,
2875        store: &mut StoreOpaque,
2876        caller: RuntimeComponentInstanceIndex,
2877        options: OptionsIndex,
2878        set: u32,
2879        payload: u32,
2880    ) -> Result<u32> {
2881        self.id().get(store).check_may_leave(caller)?;
2882        let &CanonicalOptions {
2883            cancellable,
2884            instance: caller_instance,
2885            ..
2886        } = &self.id().get(store).component().env_component().options[options];
2887        let rep =
2888            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2889
2890        self.waitable_check(
2891            store,
2892            cancellable,
2893            WaitableCheck::Poll(WaitableCheckParams {
2894                set: TableId::new(rep),
2895                options,
2896                payload,
2897            }),
2898        )
2899    }
2900
2901    /// Implements the `thread.index` intrinsic.
2902    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2903        let thread_id = store
2904            .concurrent_state_mut()
2905            .guest_thread
2906            .ok_or_else(|| anyhow!("no current thread"))?
2907            .thread;
2908        // The unwrap is safe because `instance_rep` must be `Some` by this point
2909        Ok(store
2910            .concurrent_state_mut()
2911            .get_mut(thread_id)?
2912            .instance_rep
2913            .unwrap())
2914    }
2915
2916    /// Implements the `thread.new-indirect` intrinsic.
2917    pub(crate) fn thread_new_indirect<T: 'static>(
2918        self,
2919        mut store: StoreContextMut<T>,
2920        runtime_instance: RuntimeComponentInstanceIndex,
2921        _func_ty_idx: TypeFuncIndex, // currently unused
2922        start_func_table_idx: RuntimeTableIndex,
2923        start_func_idx: u32,
2924        context: i32,
2925    ) -> Result<u32> {
2926        self.id().get(store.0).check_may_leave(runtime_instance)?;
2927
2928        log::trace!("creating new thread");
2929
2930        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2931        let instance = self.id().get_mut(store.0);
2932        let callee = instance
2933            .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2934            .ok_or_else(|| {
2935                anyhow!("the start function index points to an uninitialized function")
2936            })?;
2937        if callee.type_index(store.0) != start_func_ty.type_index() {
2938            bail!(
2939                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2940            );
2941        }
2942
2943        let token = StoreToken::new(store.as_context_mut());
2944        let start_func = Box::new(
2945            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2946                let old_thread = store
2947                    .concurrent_state_mut()
2948                    .guest_thread
2949                    .replace(guest_thread);
2950                log::trace!(
2951                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2952                );
2953
2954                store.maybe_push_call_context(guest_thread.task)?;
2955
2956                let mut store = token.as_context_mut(store);
2957                let mut params = [ValRaw::i32(context)];
2958                // Use call_unchecked rather than call or call_async, as we don't want to run the function
2959                // on a separate fiber if we're running in an async store.
2960                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2961
2962                store.0.maybe_pop_call_context(guest_thread.task)?;
2963
2964                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2965                log::trace!("explicit thread {guest_thread:?} completed");
2966                let state = store.0.concurrent_state_mut();
2967                let task = state.get_mut(guest_thread.task)?;
2968                if task.threads.is_empty() && !task.returned_or_cancelled() {
2969                    bail!(Trap::NoAsyncResult);
2970                }
2971                state.guest_thread = old_thread;
2972                old_thread
2973                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2974                if state.get_mut(guest_thread.task)?.ready_to_delete() {
2975                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2976                }
2977                log::trace!("thread start: restored {old_thread:?} as current thread");
2978
2979                Ok(())
2980            },
2981        );
2982
2983        let state = store.0.concurrent_state_mut();
2984        let current_thread = state.guest_thread.unwrap();
2985        let parent_task = current_thread.task;
2986
2987        let new_thread = GuestThread::new_explicit(parent_task, start_func);
2988        let thread_id = state.push(new_thread)?;
2989        state.get_mut(parent_task)?.threads.insert(thread_id);
2990
2991        log::trace!("new thread with id {thread_id:?} created");
2992
2993        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
2994    }
2995
2996    pub(crate) fn resume_suspended_thread(
2997        self,
2998        store: &mut StoreOpaque,
2999        runtime_instance: RuntimeComponentInstanceIndex,
3000        thread_idx: u32,
3001        high_priority: bool,
3002    ) -> Result<()> {
3003        let thread_id =
3004            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3005        let state = store.concurrent_state_mut();
3006        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3007        let thread = state.get_mut(guest_thread.thread)?;
3008
3009        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3010            GuestThreadState::NotStartedExplicit(start_func) => {
3011                log::trace!("starting thread {guest_thread:?}");
3012                let guest_call = WorkItem::GuestCall(GuestCall {
3013                    thread: guest_thread,
3014                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3015                        start_func(store, guest_thread)
3016                    })),
3017                });
3018                store
3019                    .concurrent_state_mut()
3020                    .push_work_item(guest_call, high_priority);
3021            }
3022            GuestThreadState::Suspended(fiber) => {
3023                log::trace!("resuming thread {thread_id:?} that was suspended");
3024                store
3025                    .concurrent_state_mut()
3026                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3027            }
3028            _ => {
3029                bail!("cannot resume thread which is not suspended");
3030            }
3031        }
3032        Ok(())
3033    }
3034
3035    fn add_guest_thread_to_instance_table(
3036        self,
3037        thread_id: TableId<GuestThread>,
3038        store: &mut StoreOpaque,
3039        runtime_instance: RuntimeComponentInstanceIndex,
3040    ) -> Result<u32> {
3041        let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3042            .guest_thread_insert(thread_id.rep())?;
3043        store
3044            .concurrent_state_mut()
3045            .get_mut(thread_id)?
3046            .instance_rep = Some(guest_id);
3047        Ok(guest_id)
3048    }
3049
3050    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3051    /// and `thread.switch-to` intrinsics.
3052    pub(crate) fn suspension_intrinsic(
3053        self,
3054        store: &mut StoreOpaque,
3055        caller: RuntimeComponentInstanceIndex,
3056        cancellable: bool,
3057        yielding: bool,
3058        to_thread: Option<u32>,
3059    ) -> Result<WaitResult> {
3060        // There could be a pending cancellation from a previous uncancellable wait
3061        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3062            return Ok(WaitResult::Cancelled);
3063        }
3064
3065        self.id().get(store).check_may_leave(caller)?;
3066
3067        if let Some(thread) = to_thread {
3068            self.resume_suspended_thread(store, caller, thread, true)?;
3069        }
3070
3071        let state = store.concurrent_state_mut();
3072        let guest_thread = state.guest_thread.unwrap();
3073        let reason = if yielding {
3074            SuspendReason::Yielding {
3075                thread: guest_thread,
3076            }
3077        } else {
3078            SuspendReason::ExplicitlySuspending {
3079                thread: guest_thread,
3080            }
3081        };
3082
3083        store.suspend(reason)?;
3084
3085        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3086            Ok(WaitResult::Cancelled)
3087        } else {
3088            Ok(WaitResult::Completed)
3089        }
3090    }
3091
3092    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3093    fn waitable_check(
3094        self,
3095        store: &mut StoreOpaque,
3096        cancellable: bool,
3097        check: WaitableCheck,
3098    ) -> Result<u32> {
3099        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3100
3101        let (wait, set) = match &check {
3102            WaitableCheck::Wait(params) => (true, Some(params.set)),
3103            WaitableCheck::Poll(params) => (false, Some(params.set)),
3104        };
3105
3106        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3107        // First, suspend this fiber, allowing any other threads to run.
3108        store.suspend(SuspendReason::Yielding {
3109            thread: guest_thread,
3110        })?;
3111
3112        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3113
3114        let state = store.concurrent_state_mut();
3115        let task = state.get_mut(guest_thread.task)?;
3116
3117        // If we're waiting, and there are no events immediately available,
3118        // suspend the fiber until that changes.
3119        if wait {
3120            let set = set.unwrap();
3121
3122            if (task.event.is_none()
3123                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3124                && state.get_mut(set)?.ready.is_empty()
3125            {
3126                if cancellable {
3127                    let old = state
3128                        .get_mut(guest_thread.thread)?
3129                        .wake_on_cancel
3130                        .replace(set);
3131                    assert!(old.is_none());
3132                }
3133
3134                store.suspend(SuspendReason::Waiting {
3135                    set,
3136                    thread: guest_thread,
3137                })?;
3138            }
3139        }
3140
3141        log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3142
3143        let result = match check {
3144            // Deliver any pending events to the guest and return.
3145            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3146                let event =
3147                    self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3148
3149                let (ordinal, handle, result) = if wait {
3150                    let (event, waitable) = event.unwrap();
3151                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3152                    let (ordinal, result) = event.parts();
3153                    (ordinal, handle, result)
3154                } else {
3155                    if let Some((event, waitable)) = event {
3156                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3157                        let (ordinal, result) = event.parts();
3158                        (ordinal, handle, result)
3159                    } else {
3160                        log::trace!(
3161                            "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3162                            guest_thread.task,
3163                            params.set
3164                        );
3165                        let (ordinal, result) = Event::None.parts();
3166                        (ordinal, 0, result)
3167                    }
3168                };
3169                let memory = self.options_memory_mut(store, params.options);
3170                let ptr =
3171                    func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3172                memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3173                memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3174                Ok(ordinal)
3175            }
3176        };
3177
3178        result
3179    }
3180
3181    /// Implements the `subtask.cancel` intrinsic.
3182    pub(crate) fn subtask_cancel(
3183        self,
3184        store: &mut StoreOpaque,
3185        caller_instance: RuntimeComponentInstanceIndex,
3186        async_: bool,
3187        task_id: u32,
3188    ) -> Result<u32> {
3189        self.id().get(store).check_may_leave(caller_instance)?;
3190        let (rep, is_host) =
3191            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3192        let (waitable, expected_caller_instance) = if is_host {
3193            let id = TableId::<HostTask>::new(rep);
3194            (
3195                Waitable::Host(id),
3196                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3197            )
3198        } else {
3199            let id = TableId::<GuestTask>::new(rep);
3200            if let Caller::Guest { instance, .. } =
3201                &store.concurrent_state_mut().get_mut(id)?.caller
3202            {
3203                (Waitable::Guest(id), *instance)
3204            } else {
3205                unreachable!()
3206            }
3207        };
3208        // Since waitables can neither be passed between instances nor forged,
3209        // this should never fail unless there's a bug in Wasmtime, but we check
3210        // here to be sure:
3211        assert_eq!(expected_caller_instance, caller_instance);
3212
3213        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3214
3215        let concurrent_state = store.concurrent_state_mut();
3216        if let Waitable::Host(host_task) = waitable {
3217            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3218                handle.abort();
3219                return Ok(Status::ReturnCancelled as u32);
3220            }
3221        } else {
3222            let caller = concurrent_state.guest_thread.unwrap();
3223            let guest_task = TableId::<GuestTask>::new(rep);
3224            let task = concurrent_state.get_mut(guest_task)?;
3225            if !task.already_lowered_parameters() {
3226                task.lower_params = None;
3227                task.lift_result = None;
3228
3229                // Not yet started; cancel and remove from pending
3230                let callee_instance = task.instance;
3231
3232                let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3233                let pending_count = pending.len();
3234                pending.retain(|thread, _| thread.task != guest_task);
3235                // If there were no pending threads for this task, we're in an error state
3236                if pending.len() == pending_count {
3237                    bail!("`subtask.cancel` called after terminal status delivered");
3238                }
3239                return Ok(Status::StartCancelled as u32);
3240            } else if !task.returned_or_cancelled() {
3241                // Started, but not yet returned or cancelled; send the
3242                // `CANCELLED` event
3243                task.cancel_sent = true;
3244                // Note that this might overwrite an event that was set earlier
3245                // (e.g. `Event::None` if the task is yielding, or
3246                // `Event::Cancelled` if it was already cancelled), but that's
3247                // okay -- this should supersede the previous state.
3248                task.event = Some(Event::Cancelled);
3249                for thread in task.threads.clone() {
3250                    let thread = QualifiedThreadId {
3251                        task: guest_task,
3252                        thread,
3253                    };
3254                    if let Some(set) = concurrent_state
3255                        .get_mut(thread.thread)
3256                        .unwrap()
3257                        .wake_on_cancel
3258                        .take()
3259                    {
3260                        let item = match concurrent_state
3261                            .get_mut(set)?
3262                            .waiting
3263                            .remove(&thread)
3264                            .unwrap()
3265                        {
3266                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3267                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3268                                thread,
3269                                kind: GuestCallKind::DeliverEvent {
3270                                    instance,
3271                                    set: None,
3272                                },
3273                            }),
3274                        };
3275                        concurrent_state.push_high_priority(item);
3276
3277                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3278                        break;
3279                    }
3280                }
3281
3282                let concurrent_state = store.concurrent_state_mut();
3283                let task = concurrent_state.get_mut(guest_task)?;
3284                if !task.returned_or_cancelled() {
3285                    if async_ {
3286                        return Ok(BLOCKED);
3287                    } else {
3288                        store.wait_for_event(Waitable::Guest(guest_task))?;
3289                    }
3290                }
3291            }
3292        }
3293
3294        let event = waitable.take_event(store.concurrent_state_mut())?;
3295        if let Some(Event::Subtask {
3296            status: status @ (Status::Returned | Status::ReturnCancelled),
3297        }) = event
3298        {
3299            Ok(status as u32)
3300        } else {
3301            bail!("`subtask.cancel` called after terminal status delivered");
3302        }
3303    }
3304
3305    pub(crate) fn context_get(
3306        self,
3307        store: &mut StoreOpaque,
3308        caller: RuntimeComponentInstanceIndex,
3309        slot: u32,
3310    ) -> Result<u32> {
3311        self.id().get(store).check_may_leave(caller)?;
3312        store.concurrent_state_mut().context_get(slot)
3313    }
3314
3315    pub(crate) fn context_set(
3316        self,
3317        store: &mut StoreOpaque,
3318        caller: RuntimeComponentInstanceIndex,
3319        slot: u32,
3320        value: u32,
3321    ) -> Result<()> {
3322        self.id().get(store).check_may_leave(caller)?;
3323        store.concurrent_state_mut().context_set(slot, value)
3324    }
3325}
3326
3327/// Trait representing component model ABI async intrinsics and fused adapter
3328/// helper functions.
3329///
3330/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3331/// which must be valid for at least the duration of the call (and possibly for
3332/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3333/// pointers used for async calls).
3334pub trait VMComponentAsyncStore {
3335    /// A helper function for fused adapter modules involving calls where the
3336    /// one of the caller or callee is async.
3337    ///
3338    /// This helper is not used when the caller and callee both use the sync
3339    /// ABI, only when at least one is async is this used.
3340    unsafe fn prepare_call(
3341        &mut self,
3342        instance: Instance,
3343        memory: *mut VMMemoryDefinition,
3344        start: *mut VMFuncRef,
3345        return_: *mut VMFuncRef,
3346        caller_instance: RuntimeComponentInstanceIndex,
3347        callee_instance: RuntimeComponentInstanceIndex,
3348        task_return_type: TypeTupleIndex,
3349        string_encoding: u8,
3350        result_count: u32,
3351        storage: *mut ValRaw,
3352        storage_len: usize,
3353    ) -> Result<()>;
3354
3355    /// A helper function for fused adapter modules involving calls where the
3356    /// caller is sync-lowered but the callee is async-lifted.
3357    unsafe fn sync_start(
3358        &mut self,
3359        instance: Instance,
3360        callback: *mut VMFuncRef,
3361        callee: *mut VMFuncRef,
3362        param_count: u32,
3363        storage: *mut MaybeUninit<ValRaw>,
3364        storage_len: usize,
3365    ) -> Result<()>;
3366
3367    /// A helper function for fused adapter modules involving calls where the
3368    /// caller is async-lowered.
3369    unsafe fn async_start(
3370        &mut self,
3371        instance: Instance,
3372        callback: *mut VMFuncRef,
3373        post_return: *mut VMFuncRef,
3374        callee: *mut VMFuncRef,
3375        param_count: u32,
3376        result_count: u32,
3377        flags: u32,
3378    ) -> Result<u32>;
3379
3380    /// The `future.write` intrinsic.
3381    fn future_write(
3382        &mut self,
3383        instance: Instance,
3384        caller: RuntimeComponentInstanceIndex,
3385        ty: TypeFutureTableIndex,
3386        options: OptionsIndex,
3387        future: u32,
3388        address: u32,
3389    ) -> Result<u32>;
3390
3391    /// The `future.read` intrinsic.
3392    fn future_read(
3393        &mut self,
3394        instance: Instance,
3395        caller: RuntimeComponentInstanceIndex,
3396        ty: TypeFutureTableIndex,
3397        options: OptionsIndex,
3398        future: u32,
3399        address: u32,
3400    ) -> Result<u32>;
3401
3402    /// The `future.drop-writable` intrinsic.
3403    fn future_drop_writable(
3404        &mut self,
3405        instance: Instance,
3406        caller: RuntimeComponentInstanceIndex,
3407        ty: TypeFutureTableIndex,
3408        writer: u32,
3409    ) -> Result<()>;
3410
3411    /// The `stream.write` intrinsic.
3412    fn stream_write(
3413        &mut self,
3414        instance: Instance,
3415        caller: RuntimeComponentInstanceIndex,
3416        ty: TypeStreamTableIndex,
3417        options: OptionsIndex,
3418        stream: u32,
3419        address: u32,
3420        count: u32,
3421    ) -> Result<u32>;
3422
3423    /// The `stream.read` intrinsic.
3424    fn stream_read(
3425        &mut self,
3426        instance: Instance,
3427        caller: RuntimeComponentInstanceIndex,
3428        ty: TypeStreamTableIndex,
3429        options: OptionsIndex,
3430        stream: u32,
3431        address: u32,
3432        count: u32,
3433    ) -> Result<u32>;
3434
3435    /// The "fast-path" implementation of the `stream.write` intrinsic for
3436    /// "flat" (i.e. memcpy-able) payloads.
3437    fn flat_stream_write(
3438        &mut self,
3439        instance: Instance,
3440        caller: RuntimeComponentInstanceIndex,
3441        ty: TypeStreamTableIndex,
3442        options: OptionsIndex,
3443        payload_size: u32,
3444        payload_align: u32,
3445        stream: u32,
3446        address: u32,
3447        count: u32,
3448    ) -> Result<u32>;
3449
3450    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3451    /// (i.e. memcpy-able) payloads.
3452    fn flat_stream_read(
3453        &mut self,
3454        instance: Instance,
3455        caller: RuntimeComponentInstanceIndex,
3456        ty: TypeStreamTableIndex,
3457        options: OptionsIndex,
3458        payload_size: u32,
3459        payload_align: u32,
3460        stream: u32,
3461        address: u32,
3462        count: u32,
3463    ) -> Result<u32>;
3464
3465    /// The `stream.drop-writable` intrinsic.
3466    fn stream_drop_writable(
3467        &mut self,
3468        instance: Instance,
3469        caller: RuntimeComponentInstanceIndex,
3470        ty: TypeStreamTableIndex,
3471        writer: u32,
3472    ) -> Result<()>;
3473
3474    /// The `error-context.debug-message` intrinsic.
3475    fn error_context_debug_message(
3476        &mut self,
3477        instance: Instance,
3478        caller: RuntimeComponentInstanceIndex,
3479        ty: TypeComponentLocalErrorContextTableIndex,
3480        options: OptionsIndex,
3481        err_ctx_handle: u32,
3482        debug_msg_address: u32,
3483    ) -> Result<()>;
3484
3485    /// The `thread.new-indirect` intrinsic
3486    fn thread_new_indirect(
3487        &mut self,
3488        instance: Instance,
3489        caller: RuntimeComponentInstanceIndex,
3490        func_ty_idx: TypeFuncIndex,
3491        start_func_table_idx: RuntimeTableIndex,
3492        start_func_idx: u32,
3493        context: i32,
3494    ) -> Result<u32>;
3495}
3496
3497/// SAFETY: See trait docs.
3498impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3499    unsafe fn prepare_call(
3500        &mut self,
3501        instance: Instance,
3502        memory: *mut VMMemoryDefinition,
3503        start: *mut VMFuncRef,
3504        return_: *mut VMFuncRef,
3505        caller_instance: RuntimeComponentInstanceIndex,
3506        callee_instance: RuntimeComponentInstanceIndex,
3507        task_return_type: TypeTupleIndex,
3508        string_encoding: u8,
3509        result_count_or_max_if_async: u32,
3510        storage: *mut ValRaw,
3511        storage_len: usize,
3512    ) -> Result<()> {
3513        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3514        // this method will have ensured that `storage` is a valid
3515        // pointer containing at least `storage_len` items.
3516        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3517
3518        unsafe {
3519            instance.prepare_call(
3520                StoreContextMut(self),
3521                start,
3522                return_,
3523                caller_instance,
3524                callee_instance,
3525                task_return_type,
3526                memory,
3527                string_encoding,
3528                match result_count_or_max_if_async {
3529                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3530                        params,
3531                        has_result: false,
3532                    },
3533                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3534                        params,
3535                        has_result: true,
3536                    },
3537                    result_count => CallerInfo::Sync {
3538                        params,
3539                        result_count,
3540                    },
3541                },
3542            )
3543        }
3544    }
3545
3546    unsafe fn sync_start(
3547        &mut self,
3548        instance: Instance,
3549        callback: *mut VMFuncRef,
3550        callee: *mut VMFuncRef,
3551        param_count: u32,
3552        storage: *mut MaybeUninit<ValRaw>,
3553        storage_len: usize,
3554    ) -> Result<()> {
3555        unsafe {
3556            instance
3557                .start_call(
3558                    StoreContextMut(self),
3559                    callback,
3560                    ptr::null_mut(),
3561                    callee,
3562                    param_count,
3563                    1,
3564                    START_FLAG_ASYNC_CALLEE,
3565                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3566                    // this method will have ensured that `storage` is a valid
3567                    // pointer containing at least `storage_len` items.
3568                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3569                )
3570                .map(drop)
3571        }
3572    }
3573
3574    unsafe fn async_start(
3575        &mut self,
3576        instance: Instance,
3577        callback: *mut VMFuncRef,
3578        post_return: *mut VMFuncRef,
3579        callee: *mut VMFuncRef,
3580        param_count: u32,
3581        result_count: u32,
3582        flags: u32,
3583    ) -> Result<u32> {
3584        unsafe {
3585            instance.start_call(
3586                StoreContextMut(self),
3587                callback,
3588                post_return,
3589                callee,
3590                param_count,
3591                result_count,
3592                flags,
3593                None,
3594            )
3595        }
3596    }
3597
3598    fn future_write(
3599        &mut self,
3600        instance: Instance,
3601        caller: RuntimeComponentInstanceIndex,
3602        ty: TypeFutureTableIndex,
3603        options: OptionsIndex,
3604        future: u32,
3605        address: u32,
3606    ) -> Result<u32> {
3607        instance.id().get(self).check_may_leave(caller)?;
3608        instance
3609            .guest_write(
3610                StoreContextMut(self),
3611                TransmitIndex::Future(ty),
3612                options,
3613                None,
3614                future,
3615                address,
3616                1,
3617            )
3618            .map(|result| result.encode())
3619    }
3620
3621    fn future_read(
3622        &mut self,
3623        instance: Instance,
3624        caller: RuntimeComponentInstanceIndex,
3625        ty: TypeFutureTableIndex,
3626        options: OptionsIndex,
3627        future: u32,
3628        address: u32,
3629    ) -> Result<u32> {
3630        instance.id().get(self).check_may_leave(caller)?;
3631        instance
3632            .guest_read(
3633                StoreContextMut(self),
3634                TransmitIndex::Future(ty),
3635                options,
3636                None,
3637                future,
3638                address,
3639                1,
3640            )
3641            .map(|result| result.encode())
3642    }
3643
3644    fn stream_write(
3645        &mut self,
3646        instance: Instance,
3647        caller: RuntimeComponentInstanceIndex,
3648        ty: TypeStreamTableIndex,
3649        options: OptionsIndex,
3650        stream: u32,
3651        address: u32,
3652        count: u32,
3653    ) -> Result<u32> {
3654        instance.id().get(self).check_may_leave(caller)?;
3655        instance
3656            .guest_write(
3657                StoreContextMut(self),
3658                TransmitIndex::Stream(ty),
3659                options,
3660                None,
3661                stream,
3662                address,
3663                count,
3664            )
3665            .map(|result| result.encode())
3666    }
3667
3668    fn stream_read(
3669        &mut self,
3670        instance: Instance,
3671        caller: RuntimeComponentInstanceIndex,
3672        ty: TypeStreamTableIndex,
3673        options: OptionsIndex,
3674        stream: u32,
3675        address: u32,
3676        count: u32,
3677    ) -> Result<u32> {
3678        instance.id().get(self).check_may_leave(caller)?;
3679        instance
3680            .guest_read(
3681                StoreContextMut(self),
3682                TransmitIndex::Stream(ty),
3683                options,
3684                None,
3685                stream,
3686                address,
3687                count,
3688            )
3689            .map(|result| result.encode())
3690    }
3691
3692    fn future_drop_writable(
3693        &mut self,
3694        instance: Instance,
3695        caller: RuntimeComponentInstanceIndex,
3696        ty: TypeFutureTableIndex,
3697        writer: u32,
3698    ) -> Result<()> {
3699        instance.id().get(self).check_may_leave(caller)?;
3700        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3701    }
3702
3703    fn flat_stream_write(
3704        &mut self,
3705        instance: Instance,
3706        caller: RuntimeComponentInstanceIndex,
3707        ty: TypeStreamTableIndex,
3708        options: OptionsIndex,
3709        payload_size: u32,
3710        payload_align: u32,
3711        stream: u32,
3712        address: u32,
3713        count: u32,
3714    ) -> Result<u32> {
3715        instance.id().get(self).check_may_leave(caller)?;
3716        instance
3717            .guest_write(
3718                StoreContextMut(self),
3719                TransmitIndex::Stream(ty),
3720                options,
3721                Some(FlatAbi {
3722                    size: payload_size,
3723                    align: payload_align,
3724                }),
3725                stream,
3726                address,
3727                count,
3728            )
3729            .map(|result| result.encode())
3730    }
3731
3732    fn flat_stream_read(
3733        &mut self,
3734        instance: Instance,
3735        caller: RuntimeComponentInstanceIndex,
3736        ty: TypeStreamTableIndex,
3737        options: OptionsIndex,
3738        payload_size: u32,
3739        payload_align: u32,
3740        stream: u32,
3741        address: u32,
3742        count: u32,
3743    ) -> Result<u32> {
3744        instance.id().get(self).check_may_leave(caller)?;
3745        instance
3746            .guest_read(
3747                StoreContextMut(self),
3748                TransmitIndex::Stream(ty),
3749                options,
3750                Some(FlatAbi {
3751                    size: payload_size,
3752                    align: payload_align,
3753                }),
3754                stream,
3755                address,
3756                count,
3757            )
3758            .map(|result| result.encode())
3759    }
3760
3761    fn stream_drop_writable(
3762        &mut self,
3763        instance: Instance,
3764        caller: RuntimeComponentInstanceIndex,
3765        ty: TypeStreamTableIndex,
3766        writer: u32,
3767    ) -> Result<()> {
3768        instance.id().get(self).check_may_leave(caller)?;
3769        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3770    }
3771
3772    fn error_context_debug_message(
3773        &mut self,
3774        instance: Instance,
3775        caller: RuntimeComponentInstanceIndex,
3776        ty: TypeComponentLocalErrorContextTableIndex,
3777        options: OptionsIndex,
3778        err_ctx_handle: u32,
3779        debug_msg_address: u32,
3780    ) -> Result<()> {
3781        instance.id().get(self).check_may_leave(caller)?;
3782        instance.error_context_debug_message(
3783            StoreContextMut(self),
3784            ty,
3785            options,
3786            err_ctx_handle,
3787            debug_msg_address,
3788        )
3789    }
3790
3791    fn thread_new_indirect(
3792        &mut self,
3793        instance: Instance,
3794        caller: RuntimeComponentInstanceIndex,
3795        func_ty_idx: TypeFuncIndex,
3796        start_func_table_idx: RuntimeTableIndex,
3797        start_func_idx: u32,
3798        context: i32,
3799    ) -> Result<u32> {
3800        instance.thread_new_indirect(
3801            StoreContextMut(self),
3802            caller,
3803            func_ty_idx,
3804            start_func_table_idx,
3805            start_func_idx,
3806            context,
3807        )
3808    }
3809}
3810
3811type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3812
3813/// Represents the state of a pending host task.
3814struct HostTask {
3815    common: WaitableCommon,
3816    caller_instance: RuntimeComponentInstanceIndex,
3817    join_handle: Option<JoinHandle>,
3818}
3819
3820impl HostTask {
3821    fn new(
3822        caller_instance: RuntimeComponentInstanceIndex,
3823        join_handle: Option<JoinHandle>,
3824    ) -> Self {
3825        Self {
3826            common: WaitableCommon::default(),
3827            caller_instance,
3828            join_handle,
3829        }
3830    }
3831}
3832
3833impl TableDebug for HostTask {
3834    fn type_name() -> &'static str {
3835        "HostTask"
3836    }
3837}
3838
3839type CallbackFn = Box<
3840    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3841        + Send
3842        + Sync
3843        + 'static,
3844>;
3845
3846/// Represents the caller of a given guest task.
3847enum Caller {
3848    /// The host called the guest task.
3849    Host {
3850        /// If present, may be used to deliver the result.
3851        tx: Option<oneshot::Sender<LiftedResult>>,
3852        /// Channel to notify once all subtasks spawned by this caller have
3853        /// completed.
3854        ///
3855        /// Note that we'll never actually send anything to this channel;
3856        /// dropping it when the refcount goes to zero is sufficient to notify
3857        /// the receiver.
3858        exit_tx: Arc<oneshot::Sender<()>>,
3859        /// If true, there's a host future that must be dropped before the task
3860        /// can be deleted.
3861        host_future_present: bool,
3862        /// If true, call `post-return` function (if any) automatically.
3863        call_post_return_automatically: bool,
3864    },
3865    /// Another guest thread called the guest task
3866    Guest {
3867        /// The id of the caller
3868        thread: QualifiedThreadId,
3869        /// The instance to use to enforce reentrance rules.
3870        ///
3871        /// Note that this might not be the same as the instance the caller task
3872        /// started executing in given that one or more synchronous guest->guest
3873        /// calls may have occurred involving multiple instances.
3874        instance: RuntimeComponentInstanceIndex,
3875    },
3876}
3877
3878/// Represents a closure and related canonical ABI parameters required to
3879/// validate a `task.return` call at runtime and lift the result.
3880struct LiftResult {
3881    lift: RawLift,
3882    ty: TypeTupleIndex,
3883    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3884    string_encoding: StringEncoding,
3885}
3886
3887/// The table ID for a guest thread, qualified by the task to which it belongs.
3888///
3889/// This exists to minimize table lookups and the necessity to pass stores around mutably
3890/// for the common case of identifying the task to which a thread belongs.
3891#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3892struct QualifiedThreadId {
3893    task: TableId<GuestTask>,
3894    thread: TableId<GuestThread>,
3895}
3896
3897impl QualifiedThreadId {
3898    fn qualify(
3899        state: &mut ConcurrentState,
3900        thread: TableId<GuestThread>,
3901    ) -> Result<QualifiedThreadId> {
3902        Ok(QualifiedThreadId {
3903            task: state.get_mut(thread)?.parent_task,
3904            thread,
3905        })
3906    }
3907}
3908
3909impl fmt::Debug for QualifiedThreadId {
3910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3911        f.debug_tuple("QualifiedThreadId")
3912            .field(&self.task.rep())
3913            .field(&self.thread.rep())
3914            .finish()
3915    }
3916}
3917
3918enum GuestThreadState {
3919    NotStartedImplicit,
3920    NotStartedExplicit(
3921        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3922    ),
3923    Running,
3924    Suspended(StoreFiber<'static>),
3925    Pending,
3926    Completed,
3927}
3928pub struct GuestThread {
3929    /// Context-local state used to implement the `context.{get,set}`
3930    /// intrinsics.
3931    context: [u32; 2],
3932    /// The owning guest task.
3933    parent_task: TableId<GuestTask>,
3934    /// If present, indicates that the thread is currently waiting on the
3935    /// specified set but may be cancelled and woken immediately.
3936    wake_on_cancel: Option<TableId<WaitableSet>>,
3937    /// The execution state of this guest thread
3938    state: GuestThreadState,
3939    /// The index of this thread in the component instance's handle table.
3940    /// This must always be `Some` after initialization.
3941    instance_rep: Option<u32>,
3942}
3943
3944impl GuestThread {
3945    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
3946    /// handle.
3947    fn from_instance(
3948        state: Pin<&mut ComponentInstance>,
3949        caller_instance: RuntimeComponentInstanceIndex,
3950        guest_thread: u32,
3951    ) -> Result<TableId<Self>> {
3952        let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3953        Ok(TableId::new(rep))
3954    }
3955
3956    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3957        Self {
3958            context: [0; 2],
3959            parent_task,
3960            wake_on_cancel: None,
3961            state: GuestThreadState::NotStartedImplicit,
3962            instance_rep: None,
3963        }
3964    }
3965
3966    fn new_explicit(
3967        parent_task: TableId<GuestTask>,
3968        start_func: Box<
3969            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3970        >,
3971    ) -> Self {
3972        Self {
3973            context: [0; 2],
3974            parent_task,
3975            wake_on_cancel: None,
3976            state: GuestThreadState::NotStartedExplicit(start_func),
3977            instance_rep: None,
3978        }
3979    }
3980}
3981
3982impl TableDebug for GuestThread {
3983    fn type_name() -> &'static str {
3984        "GuestThread"
3985    }
3986}
3987
3988enum SyncResult {
3989    NotProduced,
3990    Produced(Option<ValRaw>),
3991    Taken,
3992}
3993
3994impl SyncResult {
3995    fn take(&mut self) -> Option<Option<ValRaw>> {
3996        match mem::replace(self, SyncResult::Taken) {
3997            SyncResult::NotProduced => None,
3998            SyncResult::Produced(val) => Some(val),
3999            SyncResult::Taken => {
4000                panic!("attempted to take a synchronous result that was already taken")
4001            }
4002        }
4003    }
4004}
4005
4006#[derive(Debug)]
4007enum HostFutureState {
4008    NotApplicable,
4009    Live,
4010    Dropped,
4011}
4012
4013/// Represents a pending guest task.
4014pub(crate) struct GuestTask {
4015    /// See `WaitableCommon`
4016    common: WaitableCommon,
4017    /// Closure to lower the parameters passed to this task.
4018    lower_params: Option<RawLower>,
4019    /// See `LiftResult`
4020    lift_result: Option<LiftResult>,
4021    /// A place to stash the type-erased lifted result if it can't be delivered
4022    /// immediately.
4023    result: Option<LiftedResult>,
4024    /// Closure to call the callback function for an async-lifted export, if
4025    /// provided.
4026    callback: Option<CallbackFn>,
4027    /// See `Caller`
4028    caller: Caller,
4029    /// A place to stash the call context for managing resource borrows while
4030    /// switching between guest tasks.
4031    call_context: Option<CallContext>,
4032    /// A place to stash the lowered result for a sync-to-async call until it
4033    /// can be returned to the caller.
4034    sync_result: SyncResult,
4035    /// Whether or not the task has been cancelled (i.e. whether the task is
4036    /// permitted to call `task.cancel`).
4037    cancel_sent: bool,
4038    /// Whether or not we've sent a `Status::Starting` event to any current or
4039    /// future waiters for this waitable.
4040    starting_sent: bool,
4041    /// Pending guest subtasks created by this task (directly or indirectly).
4042    ///
4043    /// This is used to re-parent subtasks which are still running when their
4044    /// parent task is disposed.
4045    subtasks: HashSet<TableId<GuestTask>>,
4046    /// Scratch waitable set used to watch subtasks during synchronous calls.
4047    sync_call_set: TableId<WaitableSet>,
4048    /// The instance to which the exported function for this guest task belongs.
4049    ///
4050    /// Note that the task may do a sync->sync call via a fused adapter which
4051    /// results in that task executing code in a different instance, and it may
4052    /// call host functions and intrinsics from that other instance.
4053    instance: RuntimeComponentInstanceIndex,
4054    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4055    /// delivered to this task.
4056    event: Option<Event>,
4057    /// The `ExportIndex` of the guest function being called, if known.
4058    function_index: Option<ExportIndex>,
4059    /// Whether or not the task has exited.
4060    exited: bool,
4061    /// Threads belonging to this task
4062    threads: HashSet<TableId<GuestThread>>,
4063    /// The state of the host future that represents an async task, which must
4064    /// be dropped before we can delete the task.
4065    host_future_state: HostFutureState,
4066}
4067
4068impl GuestTask {
4069    fn already_lowered_parameters(&self) -> bool {
4070        // We reset `lower_params` after we lower the parameters
4071        self.lower_params.is_none()
4072    }
4073    fn returned_or_cancelled(&self) -> bool {
4074        // We reset `lift_result` after we return or exit
4075        self.lift_result.is_none()
4076    }
4077    fn ready_to_delete(&self) -> bool {
4078        let threads_completed = self.threads.is_empty();
4079        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4080        let pending_completion_event = matches!(
4081            self.common.event,
4082            Some(Event::Subtask {
4083                status: Status::Returned | Status::ReturnCancelled
4084            })
4085        );
4086        let ready = threads_completed
4087            && !has_sync_result
4088            && !pending_completion_event
4089            && !matches!(self.host_future_state, HostFutureState::Live);
4090        log::trace!(
4091            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4092            threads_completed,
4093            has_sync_result,
4094            pending_completion_event,
4095            self.host_future_state
4096        );
4097        ready
4098    }
4099    fn new(
4100        state: &mut ConcurrentState,
4101        lower_params: RawLower,
4102        lift_result: LiftResult,
4103        caller: Caller,
4104        callback: Option<CallbackFn>,
4105        component_instance: RuntimeComponentInstanceIndex,
4106    ) -> Result<Self> {
4107        let sync_call_set = state.push(WaitableSet::default())?;
4108        let host_future_state = match &caller {
4109            Caller::Guest { .. } => HostFutureState::NotApplicable,
4110            Caller::Host {
4111                host_future_present,
4112                ..
4113            } => {
4114                if *host_future_present {
4115                    HostFutureState::Live
4116                } else {
4117                    HostFutureState::NotApplicable
4118                }
4119            }
4120        };
4121        Ok(Self {
4122            common: WaitableCommon::default(),
4123            lower_params: Some(lower_params),
4124            lift_result: Some(lift_result),
4125            result: None,
4126            callback,
4127            caller,
4128            call_context: Some(CallContext::default()),
4129            sync_result: SyncResult::NotProduced,
4130            cancel_sent: false,
4131            starting_sent: false,
4132            subtasks: HashSet::new(),
4133            sync_call_set,
4134            instance: component_instance,
4135            event: None,
4136            function_index: None,
4137            exited: false,
4138            threads: HashSet::new(),
4139            host_future_state,
4140        })
4141    }
4142
4143    /// Dispose of this guest task, reparenting any pending subtasks to the
4144    /// caller.
4145    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4146        // If there are not-yet-delivered completion events for subtasks in
4147        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4148        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4149            if let Some(Event::Subtask {
4150                status: Status::Returned | Status::ReturnCancelled,
4151            }) = waitable.common(state)?.event
4152            {
4153                waitable.delete_from(state)?;
4154            }
4155        }
4156
4157        assert!(self.threads.is_empty());
4158
4159        state.delete(self.sync_call_set)?;
4160
4161        // Reparent any pending subtasks to the caller.
4162        match &self.caller {
4163            Caller::Guest {
4164                thread,
4165                instance: runtime_instance,
4166            } => {
4167                let task_mut = state.get_mut(thread.task)?;
4168                let present = task_mut.subtasks.remove(&me);
4169                assert!(present);
4170
4171                for subtask in &self.subtasks {
4172                    task_mut.subtasks.insert(*subtask);
4173                }
4174
4175                for subtask in &self.subtasks {
4176                    state.get_mut(*subtask)?.caller = Caller::Guest {
4177                        thread: *thread,
4178                        instance: *runtime_instance,
4179                    };
4180                }
4181            }
4182            Caller::Host { exit_tx, .. } => {
4183                for subtask in &self.subtasks {
4184                    state.get_mut(*subtask)?.caller = Caller::Host {
4185                        tx: None,
4186                        // Clone `exit_tx` to ensure that it is only dropped
4187                        // once all transitive subtasks of the host call have
4188                        // exited:
4189                        exit_tx: exit_tx.clone(),
4190                        host_future_present: false,
4191                        call_post_return_automatically: true,
4192                    };
4193                }
4194            }
4195        }
4196
4197        for subtask in self.subtasks {
4198            if state.get_mut(subtask)?.exited {
4199                Waitable::Guest(subtask).delete_from(state)?;
4200            }
4201        }
4202
4203        Ok(())
4204    }
4205
4206    fn call_post_return_automatically(&self) -> bool {
4207        matches!(
4208            self.caller,
4209            Caller::Guest { .. }
4210                | Caller::Host {
4211                    call_post_return_automatically: true,
4212                    ..
4213                }
4214        )
4215    }
4216}
4217
4218impl TableDebug for GuestTask {
4219    fn type_name() -> &'static str {
4220        "GuestTask"
4221    }
4222}
4223
4224/// Represents state common to all kinds of waitables.
4225#[derive(Default)]
4226struct WaitableCommon {
4227    /// The currently pending event for this waitable, if any.
4228    event: Option<Event>,
4229    /// The set to which this waitable belongs, if any.
4230    set: Option<TableId<WaitableSet>>,
4231    /// The handle with which the guest refers to this waitable, if any.
4232    handle: Option<u32>,
4233}
4234
4235/// Represents a Component Model Async `waitable`.
4236#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4237enum Waitable {
4238    /// A host task
4239    Host(TableId<HostTask>),
4240    /// A guest task
4241    Guest(TableId<GuestTask>),
4242    /// The read or write end of a stream or future
4243    Transmit(TableId<TransmitHandle>),
4244}
4245
4246impl Waitable {
4247    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4248    /// handle.
4249    fn from_instance(
4250        state: Pin<&mut ComponentInstance>,
4251        caller_instance: RuntimeComponentInstanceIndex,
4252        waitable: u32,
4253    ) -> Result<Self> {
4254        use crate::runtime::vm::component::Waitable;
4255
4256        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4257
4258        Ok(match kind {
4259            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4260            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4261            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4262        })
4263    }
4264
4265    /// Retrieve the host-visible identifier for this `Waitable`.
4266    fn rep(&self) -> u32 {
4267        match self {
4268            Self::Host(id) => id.rep(),
4269            Self::Guest(id) => id.rep(),
4270            Self::Transmit(id) => id.rep(),
4271        }
4272    }
4273
4274    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4275    /// remove it from any set it may currently belong to (when `set` is
4276    /// `None`).
4277    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4278        log::trace!("waitable {self:?} join set {set:?}",);
4279
4280        let old = mem::replace(&mut self.common(state)?.set, set);
4281
4282        if let Some(old) = old {
4283            match *self {
4284                Waitable::Host(id) => state.remove_child(id, old),
4285                Waitable::Guest(id) => state.remove_child(id, old),
4286                Waitable::Transmit(id) => state.remove_child(id, old),
4287            }?;
4288
4289            state.get_mut(old)?.ready.remove(self);
4290        }
4291
4292        if let Some(set) = set {
4293            match *self {
4294                Waitable::Host(id) => state.add_child(id, set),
4295                Waitable::Guest(id) => state.add_child(id, set),
4296                Waitable::Transmit(id) => state.add_child(id, set),
4297            }?;
4298
4299            if self.common(state)?.event.is_some() {
4300                self.mark_ready(state)?;
4301            }
4302        }
4303
4304        Ok(())
4305    }
4306
4307    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4308    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4309        Ok(match self {
4310            Self::Host(id) => &mut state.get_mut(*id)?.common,
4311            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4312            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4313        })
4314    }
4315
4316    /// Set or clear the pending event for this waitable and either deliver it
4317    /// to the first waiter, if any, or mark it as ready to be delivered to the
4318    /// next waiter that arrives.
4319    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4320        log::trace!("set event for {self:?}: {event:?}");
4321        self.common(state)?.event = event;
4322        self.mark_ready(state)
4323    }
4324
4325    /// Take the pending event from this waitable, leaving `None` in its place.
4326    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4327        let common = self.common(state)?;
4328        let event = common.event.take();
4329        if let Some(set) = self.common(state)?.set {
4330            state.get_mut(set)?.ready.remove(self);
4331        }
4332
4333        Ok(event)
4334    }
4335
4336    /// Deliver the current event for this waitable to the first waiter, if any,
4337    /// or else mark it as ready to be delivered to the next waiter that
4338    /// arrives.
4339    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4340        if let Some(set) = self.common(state)?.set {
4341            state.get_mut(set)?.ready.insert(*self);
4342            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4343                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4344                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4345
4346                let item = match mode {
4347                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4348                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4349                        thread,
4350                        kind: GuestCallKind::DeliverEvent {
4351                            instance,
4352                            set: Some(set),
4353                        },
4354                    }),
4355                };
4356                state.push_high_priority(item);
4357            }
4358        }
4359        Ok(())
4360    }
4361
4362    /// Remove this waitable from the instance's rep table.
4363    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4364        match self {
4365            Self::Host(task) => {
4366                log::trace!("delete host task {task:?}");
4367                state.delete(*task)?;
4368            }
4369            Self::Guest(task) => {
4370                log::trace!("delete guest task {task:?}");
4371                state.delete(*task)?.dispose(state, *task)?;
4372            }
4373            Self::Transmit(task) => {
4374                state.delete(*task)?;
4375            }
4376        }
4377
4378        Ok(())
4379    }
4380}
4381
4382impl fmt::Debug for Waitable {
4383    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4384        match self {
4385            Self::Host(id) => write!(f, "{id:?}"),
4386            Self::Guest(id) => write!(f, "{id:?}"),
4387            Self::Transmit(id) => write!(f, "{id:?}"),
4388        }
4389    }
4390}
4391
4392/// Represents a Component Model Async `waitable-set`.
4393#[derive(Default)]
4394struct WaitableSet {
4395    /// Which waitables in this set have pending events, if any.
4396    ready: BTreeSet<Waitable>,
4397    /// Which guest threads are currently waiting on this set, if any.
4398    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4399}
4400
4401impl TableDebug for WaitableSet {
4402    fn type_name() -> &'static str {
4403        "WaitableSet"
4404    }
4405}
4406
4407/// Type-erased closure to lower the parameters for a guest task.
4408type RawLower =
4409    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4410
4411/// Type-erased closure to lift the result for a guest task.
4412type RawLift = Box<
4413    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4414>;
4415
4416/// Type erased result of a guest task which may be downcast to the expected
4417/// type by a host caller (or simply ignored in the case of a guest caller; see
4418/// `DummyResult`).
4419type LiftedResult = Box<dyn Any + Send + Sync>;
4420
4421/// Used to return a result from a `LiftFn` when the actual result has already
4422/// been lowered to a guest task's stack and linear memory.
4423struct DummyResult;
4424
4425/// Represents the Component Model Async state of a (sub-)component instance.
4426#[derive(Default)]
4427struct InstanceState {
4428    /// Whether backpressure is set for this instance (enabled if >0)
4429    backpressure: u16,
4430    /// Whether this instance can be entered
4431    do_not_enter: bool,
4432    /// Pending calls for this instance which require `Self::backpressure` to be
4433    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4434    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4435}
4436
4437/// Represents the Component Model Async state of a store.
4438pub struct ConcurrentState {
4439    /// The currently running guest thread, if any.
4440    guest_thread: Option<QualifiedThreadId>,
4441
4442    /// The set of pending host and background tasks, if any.
4443    ///
4444    /// See `ComponentInstance::poll_until` for where we temporarily take this
4445    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4446    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4447    /// The table of waitables, waitable sets, etc.
4448    table: AlwaysMut<ResourceTable>,
4449    /// Per (sub-)component instance states.
4450    ///
4451    /// See `InstanceState` for details and note that this map is lazily
4452    /// populated as needed.
4453    // TODO: this can and should be a `PrimaryMap`
4454    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4455    /// The "high priority" work queue for this store's event loop.
4456    high_priority: Vec<WorkItem>,
4457    /// The "low priority" work queue for this store's event loop.
4458    low_priority: Vec<WorkItem>,
4459    /// A place to stash the reason a fiber is suspending so that the code which
4460    /// resumed it will know under what conditions the fiber should be resumed
4461    /// again.
4462    suspend_reason: Option<SuspendReason>,
4463    /// A cached fiber which is waiting for work to do.
4464    ///
4465    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4466    worker: Option<StoreFiber<'static>>,
4467    /// A place to stash the work item for which we're resuming a worker fiber.
4468    worker_item: Option<WorkerItem>,
4469
4470    /// Reference counts for all component error contexts
4471    ///
4472    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4473    /// (sub)component ref counts as tracked by `error_context_tables`, for
4474    /// example when the host holds one or more references to error contexts.
4475    ///
4476    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4477    /// component-wide representation) of the index into concurrent state for a given
4478    /// stored `ErrorContext`.
4479    ///
4480    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4481    /// as a `TableId<ErrorContextState>`.
4482    global_error_context_ref_counts:
4483        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4484}
4485
4486impl Default for ConcurrentState {
4487    fn default() -> Self {
4488        Self {
4489            guest_thread: None,
4490            table: AlwaysMut::new(ResourceTable::new()),
4491            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4492            instance_states: HashMap::new(),
4493            high_priority: Vec::new(),
4494            low_priority: Vec::new(),
4495            suspend_reason: None,
4496            worker: None,
4497            worker_item: None,
4498            global_error_context_ref_counts: BTreeMap::new(),
4499        }
4500    }
4501}
4502
4503impl ConcurrentState {
4504    /// Take ownership of any fibers and futures owned by this object.
4505    ///
4506    /// This should be used when disposing of the `Store` containing this object
4507    /// in order to gracefully resolve any and all fibers using
4508    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4509    /// use-after-free bugs due to fibers which may still have access to the
4510    /// `Store`.
4511    ///
4512    /// Additionally, the futures collected with this function should be dropped
4513    /// within a `tls::set` call, which will ensure than any futures closing
4514    /// over an `&Accessor` will have access to the store when dropped, allowing
4515    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4516    /// panicking.
4517    ///
4518    /// Note that this will leave the object in an inconsistent and unusable
4519    /// state, so it should only be used just prior to dropping it.
4520    pub(crate) fn take_fibers_and_futures(
4521        &mut self,
4522        fibers: &mut Vec<StoreFiber<'static>>,
4523        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4524    ) {
4525        for entry in self.table.get_mut().iter_mut() {
4526            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4527                for mode in mem::take(&mut set.waiting).into_values() {
4528                    if let WaitMode::Fiber(fiber) = mode {
4529                        fibers.push(fiber);
4530                    }
4531                }
4532            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4533                if let GuestThreadState::Suspended(fiber) =
4534                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4535                {
4536                    fibers.push(fiber);
4537                }
4538            }
4539        }
4540
4541        if let Some(fiber) = self.worker.take() {
4542            fibers.push(fiber);
4543        }
4544
4545        let mut take_items = |list| {
4546            for item in mem::take(list) {
4547                match item {
4548                    WorkItem::ResumeFiber(fiber) => {
4549                        fibers.push(fiber);
4550                    }
4551                    WorkItem::PushFuture(future) => {
4552                        self.futures
4553                            .get_mut()
4554                            .as_mut()
4555                            .unwrap()
4556                            .push(future.into_inner());
4557                    }
4558                    _ => {}
4559                }
4560            }
4561        };
4562
4563        take_items(&mut self.high_priority);
4564        take_items(&mut self.low_priority);
4565
4566        if let Some(them) = self.futures.get_mut().take() {
4567            futures.push(them);
4568        }
4569    }
4570
4571    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4572        self.instance_states.entry(instance).or_default()
4573    }
4574
4575    fn push<V: Send + Sync + 'static>(
4576        &mut self,
4577        value: V,
4578    ) -> Result<TableId<V>, ResourceTableError> {
4579        self.table.get_mut().push(value).map(TableId::from)
4580    }
4581
4582    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4583        self.table.get_mut().get_mut(&Resource::from(id))
4584    }
4585
4586    pub fn add_child<T: 'static, U: 'static>(
4587        &mut self,
4588        child: TableId<T>,
4589        parent: TableId<U>,
4590    ) -> Result<(), ResourceTableError> {
4591        self.table
4592            .get_mut()
4593            .add_child(Resource::from(child), Resource::from(parent))
4594    }
4595
4596    pub fn remove_child<T: 'static, U: 'static>(
4597        &mut self,
4598        child: TableId<T>,
4599        parent: TableId<U>,
4600    ) -> Result<(), ResourceTableError> {
4601        self.table
4602            .get_mut()
4603            .remove_child(Resource::from(child), Resource::from(parent))
4604    }
4605
4606    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4607        self.table.get_mut().delete(Resource::from(id))
4608    }
4609
4610    fn push_future(&mut self, future: HostTaskFuture) {
4611        // Note that we can't directly push to `ConcurrentState::futures` here
4612        // since this may be called from a future that's being polled inside
4613        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4614        // so it has exclusive access while polling it.  Therefore, we push a
4615        // work item to the "high priority" queue, which will actually push to
4616        // `ConcurrentState::futures` later.
4617        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4618    }
4619
4620    fn push_high_priority(&mut self, item: WorkItem) {
4621        log::trace!("push high priority: {item:?}");
4622        self.high_priority.push(item);
4623    }
4624
4625    fn push_low_priority(&mut self, item: WorkItem) {
4626        log::trace!("push low priority: {item:?}");
4627        self.low_priority.push(item);
4628    }
4629
4630    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4631        if high_priority {
4632            self.push_high_priority(item);
4633        } else {
4634            self.push_low_priority(item);
4635        }
4636    }
4637
4638    /// Determine whether the instance associated with the specified guest task
4639    /// may be entered (i.e. is not already on the async call stack).
4640    ///
4641    /// This is an additional check on top of the "may_enter" instance flag;
4642    /// it's needed because async-lifted exports with callback functions must
4643    /// not call their own instances directly or indirectly, and due to the
4644    /// "stackless" nature of callback-enabled guest tasks this may happen even
4645    /// if there are no activation records on the stack (i.e. the "may_enter"
4646    /// field is `true`) for that instance.
4647    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4648        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4649
4650        // Walk the task tree back to the root, looking for potential
4651        // reentrance.
4652        //
4653        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4654        // such that each bit represents and instance which has been entered by
4655        // that task or an ancestor of that task, in which case this would be a
4656        // constant time check.
4657        loop {
4658            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4659                Caller::Host { .. } => break true,
4660                Caller::Guest { thread, instance } => {
4661                    if *instance == guest_instance {
4662                        break false;
4663                    } else {
4664                        *thread
4665                    }
4666                }
4667            };
4668            guest_task = next_thread.task;
4669        }
4670    }
4671
4672    /// Record that we're about to enter a (sub-)component instance which does
4673    /// not support more than one concurrent, stackful activation, meaning it
4674    /// cannot be entered again until the next call returns.
4675    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4676        self.instance_state(instance).do_not_enter = true;
4677    }
4678
4679    /// Record that we've exited a (sub-)component instance previously entered
4680    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4681    /// See the documentation for the latter for details.
4682    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4683        self.instance_state(instance).do_not_enter = false;
4684        self.partition_pending(instance)
4685    }
4686
4687    /// Iterate over `InstanceState::pending`, moving any ready items into the
4688    /// "high priority" work item queue.
4689    ///
4690    /// See `GuestCall::is_ready` for details.
4691    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4692        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4693            let call = GuestCall { thread, kind };
4694            if call.is_ready(self)? {
4695                self.push_high_priority(WorkItem::GuestCall(call));
4696            } else {
4697                self.instance_state(instance)
4698                    .pending
4699                    .insert(call.thread, call.kind);
4700            }
4701        }
4702
4703        Ok(())
4704    }
4705
4706    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4707    pub(crate) fn backpressure_modify(
4708        &mut self,
4709        caller_instance: RuntimeComponentInstanceIndex,
4710        modify: impl FnOnce(u16) -> Option<u16>,
4711    ) -> Result<()> {
4712        let state = self.instance_state(caller_instance);
4713        let old = state.backpressure;
4714        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4715        state.backpressure = new;
4716
4717        if old > 0 && new == 0 {
4718            // Backpressure was previously enabled and is now disabled; move any
4719            // newly-eligible guest calls to the "high priority" queue.
4720            self.partition_pending(caller_instance)?;
4721        }
4722
4723        Ok(())
4724    }
4725
4726    /// Implements the `context.get` intrinsic.
4727    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4728        let thread = self.guest_thread.unwrap();
4729        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4730        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4731        Ok(val)
4732    }
4733
4734    /// Implements the `context.set` intrinsic.
4735    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4736        let thread = self.guest_thread.unwrap();
4737        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4738        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4739        Ok(())
4740    }
4741
4742    /// Returns whether there's a pending cancellation on the current guest thread,
4743    /// consuming the event if so.
4744    fn take_pending_cancellation(&mut self) -> bool {
4745        let thread = self.guest_thread.unwrap();
4746        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4747            assert!(matches!(event, Event::Cancelled));
4748            true
4749        } else {
4750            false
4751        }
4752    }
4753}
4754
4755/// Provide a type hint to compiler about the shape of a parameter lower
4756/// closure.
4757fn for_any_lower<
4758    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4759>(
4760    fun: F,
4761) -> F {
4762    fun
4763}
4764
4765/// Provide a type hint to compiler about the shape of a result lift closure.
4766fn for_any_lift<
4767    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4768>(
4769    fun: F,
4770) -> F {
4771    fun
4772}
4773
4774/// Wrap the specified future in a `poll_fn` which asserts that the future is
4775/// only polled from the event loop of the specified `Store`.
4776///
4777/// See `StoreContextMut::run_concurrent` for details.
4778fn checked<F: Future + Send + 'static>(
4779    id: StoreId,
4780    fut: F,
4781) -> impl Future<Output = F::Output> + Send + 'static {
4782    async move {
4783        let mut fut = pin!(fut);
4784        future::poll_fn(move |cx| {
4785            let message = "\
4786                `Future`s which depend on asynchronous component tasks, streams, or \
4787                futures to complete may only be polled from the event loop of the \
4788                store to which they belong.  Please use \
4789                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4790            ";
4791            tls::try_get(|store| {
4792                let matched = match store {
4793                    tls::TryGet::Some(store) => store.id() == id,
4794                    tls::TryGet::Taken | tls::TryGet::None => false,
4795                };
4796
4797                if !matched {
4798                    panic!("{message}")
4799                }
4800            });
4801            fut.as_mut().poll(cx)
4802        })
4803        .await
4804    }
4805}
4806
4807/// Assert that `StoreContextMut::run_concurrent` has not been called from
4808/// within an store's event loop.
4809fn check_recursive_run() {
4810    tls::try_get(|store| {
4811        if !matches!(store, tls::TryGet::None) {
4812            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4813        }
4814    });
4815}
4816
4817fn unpack_callback_code(code: u32) -> (u32, u32) {
4818    (code & 0xF, code >> 4)
4819}
4820
4821/// Helper struct for packaging parameters to be passed to
4822/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4823/// `waitable-set.poll`.
4824struct WaitableCheckParams {
4825    set: TableId<WaitableSet>,
4826    options: OptionsIndex,
4827    payload: u32,
4828}
4829
4830/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4831enum WaitableCheck {
4832    Wait(WaitableCheckParams),
4833    Poll(WaitableCheckParams),
4834}
4835
4836/// Represents a guest task called from the host, prepared using `prepare_call`.
4837pub(crate) struct PreparedCall<R> {
4838    /// The guest export to be called
4839    handle: Func,
4840    /// The guest thread created by `prepare_call`
4841    thread: QualifiedThreadId,
4842    /// The number of lowered core Wasm parameters to pass to the call.
4843    param_count: usize,
4844    /// The `oneshot::Receiver` to which the result of the call will be
4845    /// delivered when it is available.
4846    rx: oneshot::Receiver<LiftedResult>,
4847    /// The `oneshot::Receiver` which will resolve when the task -- and any
4848    /// transitive subtasks -- have all exited.
4849    exit_rx: oneshot::Receiver<()>,
4850    _phantom: PhantomData<R>,
4851}
4852
4853impl<R> PreparedCall<R> {
4854    /// Get a copy of the `TaskId` for this `PreparedCall`.
4855    pub(crate) fn task_id(&self) -> TaskId {
4856        TaskId {
4857            task: self.thread.task,
4858        }
4859    }
4860}
4861
4862/// Represents a task created by `prepare_call`.
4863pub(crate) struct TaskId {
4864    task: TableId<GuestTask>,
4865}
4866
4867impl TaskId {
4868    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
4869    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
4870    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
4871    /// and can be resumed by other tasks for this component, so we mark the future as dropped
4872    /// and delete the task when all threads are done.
4873    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4874        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4875        if !task.already_lowered_parameters() {
4876            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4877        } else {
4878            task.host_future_state = HostFutureState::Dropped;
4879            if task.ready_to_delete() {
4880                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4881            }
4882        }
4883        Ok(())
4884    }
4885}
4886
4887/// Prepare a call to the specified exported Wasm function, providing functions
4888/// for lowering the parameters and lifting the result.
4889///
4890/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4891/// loop, use `queue_call`.
4892pub(crate) fn prepare_call<T, R>(
4893    mut store: StoreContextMut<T>,
4894    handle: Func,
4895    param_count: usize,
4896    host_future_present: bool,
4897    call_post_return_automatically: bool,
4898    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4899    + Send
4900    + Sync
4901    + 'static,
4902    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4903    + Send
4904    + Sync
4905    + 'static,
4906) -> Result<PreparedCall<R>> {
4907    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4908
4909    let instance = handle.instance().id().get(store.0);
4910    let options = &instance.component().env_component().options[options];
4911    let task_return_type = instance.component().types()[ty].results;
4912    let component_instance = raw_options.instance;
4913    let callback = options.callback.map(|i| instance.runtime_callback(i));
4914    let memory = options
4915        .memory()
4916        .map(|i| instance.runtime_memory(i))
4917        .map(SendSyncPtr::new);
4918    let string_encoding = options.string_encoding;
4919    let token = StoreToken::new(store.as_context_mut());
4920    let state = store.0.concurrent_state_mut();
4921
4922    assert!(state.guest_thread.is_none());
4923
4924    let (tx, rx) = oneshot::channel();
4925    let (exit_tx, exit_rx) = oneshot::channel();
4926
4927    let mut task = GuestTask::new(
4928        state,
4929        Box::new(for_any_lower(move |store, params| {
4930            lower_params(handle, token.as_context_mut(store), params)
4931        })),
4932        LiftResult {
4933            lift: Box::new(for_any_lift(move |store, result| {
4934                lift_result(handle, store, result)
4935            })),
4936            ty: task_return_type,
4937            memory,
4938            string_encoding,
4939        },
4940        Caller::Host {
4941            tx: Some(tx),
4942            exit_tx: Arc::new(exit_tx),
4943            host_future_present,
4944            call_post_return_automatically,
4945        },
4946        callback.map(|callback| {
4947            let callback = SendSyncPtr::new(callback);
4948            let instance = handle.instance();
4949            Box::new(
4950                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4951                    let store = token.as_context_mut(store);
4952                    // SAFETY: Per the contract of `prepare_call`, the callback
4953                    // will remain valid at least as long is this task exists.
4954                    unsafe {
4955                        instance.call_callback(
4956                            store,
4957                            runtime_instance,
4958                            callback,
4959                            event,
4960                            handle,
4961                            call_post_return_automatically,
4962                        )
4963                    }
4964                },
4965            ) as CallbackFn
4966        }),
4967        component_instance,
4968    )?;
4969    task.function_index = Some(handle.index());
4970
4971    let task = state.push(task)?;
4972    let thread = state.push(GuestThread::new_implicit(task))?;
4973    state.get_mut(task)?.threads.insert(thread);
4974
4975    Ok(PreparedCall {
4976        handle,
4977        thread: QualifiedThreadId { task, thread },
4978        param_count,
4979        rx,
4980        exit_rx,
4981        _phantom: PhantomData,
4982    })
4983}
4984
4985/// Queue a call previously prepared using `prepare_call` to be run as part of
4986/// the associated `ComponentInstance`'s event loop.
4987///
4988/// The returned future will resolve to the result once it is available, but
4989/// must only be polled via the instance's event loop. See
4990/// `StoreContextMut::run_concurrent` for details.
4991pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4992    mut store: StoreContextMut<T>,
4993    prepared: PreparedCall<R>,
4994) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4995    let PreparedCall {
4996        handle,
4997        thread,
4998        param_count,
4999        rx,
5000        exit_rx,
5001        ..
5002    } = prepared;
5003
5004    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5005
5006    Ok(checked(
5007        store.0.id(),
5008        rx.map(move |result| {
5009            result
5010                .map(|v| (*v.downcast().unwrap(), exit_rx))
5011                .map_err(anyhow::Error::from)
5012        }),
5013    ))
5014}
5015
5016/// Queue a call previously prepared using `prepare_call` to be run as part of
5017/// the associated `ComponentInstance`'s event loop.
5018fn queue_call0<T: 'static>(
5019    store: StoreContextMut<T>,
5020    handle: Func,
5021    guest_thread: QualifiedThreadId,
5022    param_count: usize,
5023) -> Result<()> {
5024    let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5025    let is_concurrent = raw_options.async_;
5026    let callback = raw_options.callback;
5027    let instance = handle.instance();
5028    let callee = handle.lifted_core_func(store.0);
5029    let post_return = handle.post_return_core_func(store.0);
5030    let callback = callback.map(|i| {
5031        let instance = instance.id().get(store.0);
5032        SendSyncPtr::new(instance.runtime_callback(i))
5033    });
5034
5035    log::trace!("queueing call {guest_thread:?}");
5036
5037    let instance_flags = if callback.is_none() {
5038        None
5039    } else {
5040        Some(flags)
5041    };
5042
5043    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5044    // (with signatures appropriate for this call) and will remain valid as
5045    // long as this instance is valid.
5046    unsafe {
5047        instance.queue_call(
5048            store,
5049            guest_thread,
5050            SendSyncPtr::new(callee),
5051            param_count,
5052            1,
5053            instance_flags,
5054            is_concurrent,
5055            callback,
5056            post_return.map(SendSyncPtr::new),
5057        )
5058    }
5059}