wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, Options};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58    CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::Trap;
86use wasmtime_environ::component::{
87    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
88    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93
94pub use abort::JoinHandle;
95pub use futures_and_streams::{
96    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
97    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
98    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
99};
100pub(crate) use futures_and_streams::{
101    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
102};
103
104mod abort;
105mod error_contexts;
106mod futures_and_streams;
107pub(crate) mod table;
108pub(crate) mod tls;
109
110/// Constant defined in the Component Model spec to indicate that the async
111/// intrinsic (e.g. `future.write`) has not yet completed.
112const BLOCKED: u32 = 0xffff_ffff;
113
114/// Corresponds to `CallState` in the upstream spec.
115#[derive(Clone, Copy, Eq, PartialEq, Debug)]
116pub enum Status {
117    Starting = 0,
118    Started = 1,
119    Returned = 2,
120    StartCancelled = 3,
121    ReturnCancelled = 4,
122}
123
124impl Status {
125    /// Packs this status and the optional `waitable` provided into a 32-bit
126    /// result that the canonical ABI requires.
127    ///
128    /// The low 4 bits are reserved for the status while the upper 28 bits are
129    /// the waitable, if present.
130    pub fn pack(self, waitable: Option<u32>) -> u32 {
131        assert!(matches!(self, Status::Returned) == waitable.is_none());
132        let waitable = waitable.unwrap_or(0);
133        assert!(waitable < (1 << 28));
134        (waitable << 4) | (self as u32)
135    }
136}
137
138/// Corresponds to `EventCode` in the Component Model spec, plus related payload
139/// data.
140#[derive(Clone, Copy, Debug)]
141enum Event {
142    None,
143    Cancelled,
144    Subtask {
145        status: Status,
146    },
147    StreamRead {
148        code: ReturnCode,
149        pending: Option<(TypeStreamTableIndex, u32)>,
150    },
151    StreamWrite {
152        code: ReturnCode,
153        pending: Option<(TypeStreamTableIndex, u32)>,
154    },
155    FutureRead {
156        code: ReturnCode,
157        pending: Option<(TypeFutureTableIndex, u32)>,
158    },
159    FutureWrite {
160        code: ReturnCode,
161        pending: Option<(TypeFutureTableIndex, u32)>,
162    },
163}
164
165impl Event {
166    /// Lower this event to core Wasm integers for delivery to the guest.
167    ///
168    /// Note that the waitable handle, if any, is assumed to be lowered
169    /// separately.
170    fn parts(self) -> (u32, u32) {
171        const EVENT_NONE: u32 = 0;
172        const EVENT_SUBTASK: u32 = 1;
173        const EVENT_STREAM_READ: u32 = 2;
174        const EVENT_STREAM_WRITE: u32 = 3;
175        const EVENT_FUTURE_READ: u32 = 4;
176        const EVENT_FUTURE_WRITE: u32 = 5;
177        const EVENT_CANCELLED: u32 = 6;
178        match self {
179            Event::None => (EVENT_NONE, 0),
180            Event::Cancelled => (EVENT_CANCELLED, 0),
181            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
182            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
183            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
184            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
185            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
186        }
187    }
188}
189
190/// Corresponds to `CallbackCode` in the spec.
191mod callback_code {
192    pub const EXIT: u32 = 0;
193    pub const YIELD: u32 = 1;
194    pub const WAIT: u32 = 2;
195    pub const POLL: u32 = 3;
196}
197
198/// A flag indicating that the callee is an async-lowered export.
199///
200/// This may be passed to the `async-start` intrinsic from a fused adapter.
201const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
202
203/// Provides access to either store data (via the `get` method) or the store
204/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
205/// instance to which the current host task belongs.
206///
207/// See [`Accessor::with`] for details.
208pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
209    store: StoreContextMut<'a, T>,
210    get_data: fn(&mut T) -> D::Data<'_>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215    D: HasData + ?Sized,
216    T: 'static,
217{
218    /// Creates a new [`Access`] from its component parts.
219    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220        Self { store, get_data }
221    }
222
223    /// Get mutable access to the store data.
224    pub fn data_mut(&mut self) -> &mut T {
225        self.store.data_mut()
226    }
227
228    /// Get mutable access to the store data.
229    pub fn get(&mut self) -> D::Data<'_> {
230        (self.get_data)(self.data_mut())
231    }
232
233    /// Spawn a background task.
234    ///
235    /// See [`Accessor::spawn`] for details.
236    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
237    where
238        T: 'static,
239    {
240        let accessor = Accessor {
241            get_data: self.get_data,
242            token: StoreToken::new(self.store.as_context_mut()),
243        };
244        self.store
245            .as_context_mut()
246            .spawn_with_accessor(accessor, task)
247    }
248}
249
250impl<'a, T, D> AsContext for Access<'a, T, D>
251where
252    D: HasData + ?Sized,
253    T: 'static,
254{
255    type Data = T;
256
257    fn as_context(&self) -> StoreContext<'_, T> {
258        self.store.as_context()
259    }
260}
261
262impl<'a, T, D> AsContextMut for Access<'a, T, D>
263where
264    D: HasData + ?Sized,
265    T: 'static,
266{
267    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
268        self.store.as_context_mut()
269    }
270}
271
272/// Provides scoped mutable access to store data in the context of a concurrent
273/// host task future.
274///
275/// This allows multiple host task futures to execute concurrently and access
276/// the store between (but not across) `await` points.
277///
278/// # Rationale
279///
280/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
281/// `D::Data<'_>`. The problem this is solving, however, is that it does not
282/// literally store these values. The basic problem is that when a concurrent
283/// host future is being polled it has access to `&mut T` (and the whole
284/// `Store`) but when it's not being polled it does not have access to these
285/// values. This reflects how the store is only ever polling one future at a
286/// time so the store is effectively being passed between futures.
287///
288/// Rust's `Future` trait, however, has no means of passing a `Store`
289/// temporarily between futures. The [`Context`](std::task::Context) type does
290/// not have the ability to attach arbitrary information to it at this time.
291/// This type, [`Accessor`], is used to bridge this expressivity gap.
292///
293/// The [`Accessor`] type here represents the ability to acquire, temporarily in
294/// a synchronous manner, the current store. The [`Accessor::with`] function
295/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
296/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
297/// not take an `async` closure as its argument, instead it's a synchronous
298/// closure which must complete during on run of `Future::poll`. This reflects
299/// how the store is temporarily made available while a host future is being
300/// polled.
301///
302/// # Implementation
303///
304/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
305/// this type additionally doesn't even have a lifetime parameter. This is
306/// instead a representation of proof of the ability to acquire these while a
307/// future is being polled. Wasmtime will, when it polls a host future,
308/// configure ambient state such that the `Accessor` that a future closes over
309/// will work and be able to access the store.
310///
311/// This has a number of implications for users such as:
312///
313/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
314///   the lifetime of a single future.
315/// * A future is expected to, however, close over an `Accessor` and keep it
316///   alive probably for the duration of the entire future.
317/// * Different host futures will be given different `Accessor`s, and that's
318///   intentional.
319/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
320///   alleviates some otherwise required bounds to be written down.
321///
322/// # Using `Accessor` in `Drop`
323///
324/// The methods on `Accessor` are only expected to work in the context of
325/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
326/// host future can be dropped at any time throughout the system and Wasmtime
327/// store context is not necessarily available at that time. It's recommended to
328/// not use `Accessor` methods in anything connected to a `Drop` implementation
329/// as they will panic and have unintended results. If you run into this though
330/// feel free to file an issue on the Wasmtime repository.
331pub struct Accessor<T: 'static, D = HasSelf<T>>
332where
333    D: HasData + ?Sized,
334{
335    token: StoreToken<T>,
336    get_data: fn(&mut T) -> D::Data<'_>,
337}
338
339/// A helper trait to take any type of accessor-with-data in functions.
340///
341/// This trait is similar to [`AsContextMut`] except that it's used when
342/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
343/// [`Accessor`] is the main type used in concurrent settings and is passed to
344/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
345///
346/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
347/// this trait. This effectively means that regardless of the `D` in
348/// `Accessor<T, D>` it can still be passed to a function which just needs a
349/// store accessor.
350///
351/// Acquiring an [`Accessor`] can be done through
352/// [`StoreContextMut::run_concurrent`] for example or in a host function
353/// through
354/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
355pub trait AsAccessor {
356    /// The `T` in `Store<T>` that this accessor refers to.
357    type Data: 'static;
358
359    /// The `D` in `Accessor<T, D>`, or the projection out of
360    /// `Self::Data`.
361    type AccessorData: HasData + ?Sized;
362
363    /// Returns the accessor that this is referring to.
364    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
365}
366
367impl<T: AsAccessor + ?Sized> AsAccessor for &T {
368    type Data = T::Data;
369    type AccessorData = T::AccessorData;
370
371    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
372        T::as_accessor(self)
373    }
374}
375
376impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
377    type Data = T;
378    type AccessorData = D;
379
380    fn as_accessor(&self) -> &Accessor<T, D> {
381        self
382    }
383}
384
385// Note that it is intentional at this time that `Accessor` does not actually
386// store `&mut T` or anything similar. This distinctly enables the `Accessor`
387// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
388// that matter). This is used to ergonomically simplify bindings where the
389// majority of the time `Accessor` is closed over in a future which then needs
390// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
391// you already have to write `T: 'static`...) it helps to avoid this.
392//
393// Note as well that `Accessor` doesn't actually store its data at all. Instead
394// it's more of a "proof" of what can be accessed from TLS. API design around
395// `Accessor` and functions like `Linker::func_wrap_concurrent` are
396// intentionally made to ensure that `Accessor` is ideally only used in the
397// context that TLS variables are actually set. For example host functions are
398// given `&Accessor`, not `Accessor`, and this prevents them from persisting
399// the value outside of a future. Within the future the TLS variables are all
400// guaranteed to be set while the future is being polled.
401//
402// Finally though this is not an ironclad guarantee, but nor does it need to be.
403// The TLS APIs are designed to panic or otherwise model usage where they're
404// called recursively or similar. It's hoped that code cannot be constructed to
405// actually hit this at runtime but this is not a safety requirement at this
406// time.
407const _: () = {
408    const fn assert<T: Send + Sync>() {}
409    assert::<Accessor<UnsafeCell<u32>>>();
410};
411
412impl<T> Accessor<T> {
413    /// Creates a new `Accessor` backed by the specified functions.
414    ///
415    /// - `get`: used to retrieve the store
416    ///
417    /// - `get_data`: used to "project" from the store's associated data to
418    /// another type (e.g. a field of that data or a wrapper around it).
419    ///
420    /// - `spawn`: used to queue spawned background tasks to be run later
421    pub(crate) fn new(token: StoreToken<T>) -> Self {
422        Self {
423            token,
424            get_data: |x| x,
425        }
426    }
427}
428
429impl<T, D> Accessor<T, D>
430where
431    D: HasData + ?Sized,
432{
433    /// Run the specified closure, passing it mutable access to the store.
434    ///
435    /// This function is one of the main building blocks of the [`Accessor`]
436    /// type. This yields synchronous, blocking, access to store via an
437    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
438    /// providing the ability to access `D` via [`Access::get`]. Note that the
439    /// `fun` here is given only temporary access to the store and `T`/`D`
440    /// meaning that the return value `R` here is not allowed to capture borrows
441    /// into the two. If access is needed to data within `T` or `D` outside of
442    /// this closure then it must be `clone`d out, for example.
443    ///
444    /// # Panics
445    ///
446    /// This function will panic if it is call recursively with any other
447    /// accessor already in scope. For example if `with` is called within `fun`,
448    /// then this function will panic. It is up to the embedder to ensure that
449    /// this does not happen.
450    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
451        tls::get(|vmstore| {
452            fun(Access {
453                store: self.token.as_context_mut(vmstore),
454                get_data: self.get_data,
455            })
456        })
457    }
458
459    /// Returns the getter this accessor is using to project from `T` into
460    /// `D::Data`.
461    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
462        self.get_data
463    }
464
465    /// Changes this accessor to access `D2` instead of the current type
466    /// parameter `D`.
467    ///
468    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
469    ///
470    /// # Panics
471    ///
472    /// When using this API the returned value is disconnected from `&self` and
473    /// the lifetime binding the `self` argument. An `Accessor` only works
474    /// within the context of the closure or async closure that it was
475    /// originally given to, however. This means that due to the fact that the
476    /// returned value has no lifetime connection it's possible to use the
477    /// accessor outside of `&self`, the original accessor, and panic.
478    ///
479    /// The returned value should only be used within the scope of the original
480    /// `Accessor` that `self` refers to.
481    pub fn with_getter<D2: HasData>(
482        &self,
483        get_data: fn(&mut T) -> D2::Data<'_>,
484    ) -> Accessor<T, D2> {
485        Accessor {
486            token: self.token,
487            get_data,
488        }
489    }
490
491    /// Spawn a background task which will receive an `&Accessor<T, D>` and
492    /// run concurrently with any other tasks in progress for the current
493    /// store.
494    ///
495    /// This is particularly useful for host functions which return a `stream`
496    /// or `future` such that the code to write to the write end of that
497    /// `stream` or `future` must run after the function returns.
498    ///
499    /// The returned [`JoinHandle`] may be used to cancel the task.
500    ///
501    /// # Panics
502    ///
503    /// Panics if called within a closure provided to the [`Accessor::with`]
504    /// function. This can only be called outside an active invocation of
505    /// [`Accessor::with`].
506    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
507    where
508        T: 'static,
509    {
510        let accessor = self.clone_for_spawn();
511        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
512    }
513
514    fn clone_for_spawn(&self) -> Self {
515        Self {
516            token: self.token,
517            get_data: self.get_data,
518        }
519    }
520}
521
522/// Represents a task which may be provided to `Accessor::spawn`,
523/// `Accessor::forward`, or `StorecContextMut::spawn`.
524// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
525// option.
526//
527// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
528// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
529// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
530// Send + Sync + 'static` fails with a type mismatch error when we try to pass
531// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
532// best we can do for the time being.
533pub trait AccessorTask<T, D, R>: Send + 'static
534where
535    D: HasData + ?Sized,
536{
537    /// Run the task.
538    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
539}
540
541/// Represents parameter and result metadata for the caller side of a
542/// guest->guest call orchestrated by a fused adapter.
543enum CallerInfo {
544    /// Metadata for a call to an async-lowered import
545    Async {
546        params: Vec<ValRaw>,
547        has_result: bool,
548    },
549    /// Metadata for a call to an sync-lowered import
550    Sync {
551        params: Vec<ValRaw>,
552        result_count: u32,
553    },
554}
555
556/// Indicates how a guest task is waiting on a waitable set.
557enum WaitMode {
558    /// The guest task is waiting using `task.wait`
559    Fiber(StoreFiber<'static>),
560    /// The guest task is waiting via a callback declared as part of an
561    /// async-lifted export.
562    Callback(Instance),
563}
564
565/// Represents the reason a fiber is suspending itself.
566#[derive(Debug)]
567enum SuspendReason {
568    /// The fiber is waiting for an event to be delivered to the specified
569    /// waitable set or task.
570    Waiting {
571        set: TableId<WaitableSet>,
572        thread: QualifiedThreadId,
573    },
574    /// The fiber has finished handling its most recent work item and is waiting
575    /// for another (or to be dropped if it is no longer needed).
576    NeedWork,
577    /// The fiber is yielding and should be resumed once other tasks have had a
578    /// chance to run.
579    Yielding { thread: QualifiedThreadId },
580    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
581    ExplicitlySuspending { thread: QualifiedThreadId },
582}
583
584/// Represents a pending call into guest code for a given guest task.
585enum GuestCallKind {
586    /// Indicates there's an event to deliver to the task, possibly related to a
587    /// waitable set the task has been waiting on or polling.
588    DeliverEvent {
589        /// The instance to which the task belongs.
590        instance: Instance,
591        /// The waitable set the event belongs to, if any.
592        ///
593        /// If this is `None` the event will be waiting in the
594        /// `GuestTask::event` field for the task.
595        set: Option<TableId<WaitableSet>>,
596    },
597    /// Indicates that a new guest task call is pending and may be executed
598    /// using the specified closure.
599    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
600    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
601}
602
603impl fmt::Debug for GuestCallKind {
604    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
605        match self {
606            Self::DeliverEvent { instance, set } => f
607                .debug_struct("DeliverEvent")
608                .field("instance", instance)
609                .field("set", set)
610                .finish(),
611            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
612            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
613        }
614    }
615}
616
617/// Represents a pending call into guest code for a given guest thread.
618#[derive(Debug)]
619struct GuestCall {
620    thread: QualifiedThreadId,
621    kind: GuestCallKind,
622}
623
624impl GuestCall {
625    /// Returns whether or not the call is ready to run.
626    ///
627    /// A call will not be ready to run if either:
628    ///
629    /// - the (sub-)component instance to be called has already been entered and
630    /// cannot be reentered until an in-progress call completes
631    ///
632    /// - the call is for a not-yet started task and the (sub-)component
633    /// instance to be called has backpressure enabled
634    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
635        let task_instance = state.get_mut(self.thread.task)?.instance;
636        let state = state.instance_state(task_instance);
637
638        let ready = match &self.kind {
639            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
640            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
641            GuestCallKind::StartExplicit(_) => true,
642        };
643        log::trace!(
644            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
645            state.do_not_enter,
646            state.backpressure
647        );
648        Ok(ready)
649    }
650}
651
652/// Job to be run on a worker fiber.
653enum WorkerItem {
654    GuestCall(GuestCall),
655    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
656}
657
658/// Represents state related to an in-progress poll operation (e.g. `task.poll`
659/// or `CallbackCode.POLL`).
660#[derive(Debug)]
661struct PollParams {
662    /// The instance to which the polling thread belongs.
663    instance: Instance,
664    /// The polling thread.
665    thread: QualifiedThreadId,
666    /// The waitable set being polled.
667    set: TableId<WaitableSet>,
668}
669
670/// Represents a pending work item to be handled by the event loop for a given
671/// component instance.
672enum WorkItem {
673    /// A host task to be pushed to `ConcurrentState::futures`.
674    PushFuture(AlwaysMut<HostTaskFuture>),
675    /// A fiber to resume.
676    ResumeFiber(StoreFiber<'static>),
677    /// A pending call into guest code for a given guest task.
678    GuestCall(GuestCall),
679    /// A pending `task.poll` or `CallbackCode.POLL` operation.
680    Poll(PollParams),
681    /// A job to run on a worker fiber.
682    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
683}
684
685impl fmt::Debug for WorkItem {
686    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
687        match self {
688            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
689            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
690            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
691            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
692            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
693        }
694    }
695}
696
697/// Whether a suspension intrinsic was cancelled or completed
698#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
699pub(crate) enum WaitResult {
700    Cancelled,
701    Completed,
702}
703
704/// Poll the specified future until it completes on behalf of a guest->host call
705/// using a sync-lowered import.
706///
707/// This is similar to `Instance::first_poll` except it's for sync-lowered
708/// imports, meaning we don't need to handle cancellation and we can block the
709/// caller until the task completes, at which point the caller can handle
710/// lowering the result to the guest's stack and linear memory.
711pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
712    store: &mut dyn VMStore,
713    future: impl Future<Output = Result<R>> + Send + 'static,
714    caller_instance: RuntimeComponentInstanceIndex,
715) -> Result<R> {
716    let state = store.concurrent_state_mut();
717
718    // If there is no current guest thread set, that means the host function was
719    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
720    // should complete immediately.
721    let Some(caller) = state.guest_thread else {
722        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
723            Poll::Ready(result) => result,
724            Poll::Pending => {
725                unreachable!()
726            }
727        };
728    };
729
730    // Save any existing result stashed in `GuestTask::result` so we can replace
731    // it with the new result.
732    let old_result = state
733        .get_mut(caller.task)
734        .with_context(|| format!("bad handle: {caller:?}"))?
735        .result
736        .take();
737
738    // Add a temporary host task into the table so we can track its progress.
739    // Note that we'll never allocate a waitable handle for the guest since
740    // we're being called synchronously.
741    let task = state.push(HostTask::new(caller_instance, None))?;
742
743    log::trace!("new host task child of {caller:?}: {task:?}");
744
745    // Wrap the future in a closure which will take care of stashing the result
746    // in `GuestTask::result` and resuming this fiber when the host task
747    // completes.
748    let mut future = Box::pin(async move {
749        let result = future.await?;
750        tls::get(move |store| {
751            let state = store.concurrent_state_mut();
752            state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
753
754            Waitable::Host(task).set_event(
755                state,
756                Some(Event::Subtask {
757                    status: Status::Returned,
758                }),
759            )?;
760
761            Ok(())
762        })
763    }) as HostTaskFuture;
764
765    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
766    // add the future to `ConcurrentState::futures` and poll it automatically
767    // from the event loop if it doesn't complete immediately here.
768    let poll = tls::set(store, || {
769        future
770            .as_mut()
771            .poll(&mut Context::from_waker(&Waker::noop()))
772    });
773
774    match poll {
775        Poll::Ready(result) => {
776            // It completed immediately; check the result and delete the task.
777            result?;
778            log::trace!("delete host task {task:?} (already ready)");
779            store.concurrent_state_mut().delete(task)?;
780        }
781        Poll::Pending => {
782            // It did not complete immediately; add it to
783            // `ConcurrentState::futures` so it will be polled via the event
784            // loop; then use `GuestTask::sync_call_set` to wait for the task to
785            // complete, suspending the current fiber until it does so.
786            let state = store.concurrent_state_mut();
787            state.push_future(future);
788
789            let set = state.get_mut(caller.task)?.sync_call_set;
790            Waitable::Host(task).join(state, Some(set))?;
791
792            store.suspend(SuspendReason::Waiting {
793                set,
794                thread: caller,
795            })?;
796        }
797    }
798
799    // Retrieve and return the result.
800    Ok(*mem::replace(
801        &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
802        old_result,
803    )
804    .unwrap()
805    .downcast()
806    .unwrap())
807}
808
809/// Execute the specified guest call.
810fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
811    match call.kind {
812        GuestCallKind::DeliverEvent { instance, set } => {
813            let (event, waitable) = instance
814                .get_event(store, call.thread.task, set, true)?
815                .unwrap();
816            let state = store.concurrent_state_mut();
817            let task = state.get_mut(call.thread.task)?;
818            let runtime_instance = task.instance;
819            let handle = waitable.map(|(_, v)| v).unwrap_or(0);
820
821            log::trace!(
822                "use callback to deliver event {event:?} to {:?} for {waitable:?}",
823                call.thread,
824            );
825
826            let old_thread = state.guest_thread.replace(call.thread);
827            log::trace!(
828                "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
829                call.thread
830            );
831
832            store.maybe_push_call_context(call.thread.task)?;
833
834            let state = store.concurrent_state_mut();
835            state.enter_instance(runtime_instance);
836
837            let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
838
839            let code = callback(store, runtime_instance, event, handle)?;
840
841            let state = store.concurrent_state_mut();
842
843            state.get_mut(call.thread.task)?.callback = Some(callback);
844            state.exit_instance(runtime_instance)?;
845
846            store.maybe_pop_call_context(call.thread.task)?;
847
848            instance.handle_callback_code(store, call.thread, runtime_instance, code, false)?;
849
850            store.concurrent_state_mut().guest_thread = old_thread;
851            log::trace!("GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread");
852        }
853        GuestCallKind::StartImplicit(fun) => {
854            fun(store)?;
855        }
856        GuestCallKind::StartExplicit(fun) => {
857            fun(store)?;
858        }
859    }
860
861    Ok(())
862}
863
864impl<T> Store<T> {
865    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
866    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
867    where
868        T: Send + 'static,
869    {
870        self.as_context_mut().run_concurrent(fun).await
871    }
872
873    #[doc(hidden)]
874    pub fn assert_concurrent_state_empty(&mut self) {
875        self.as_context_mut().assert_concurrent_state_empty();
876    }
877
878    /// Convenience wrapper for [`StoreContextMut::spawn`].
879    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
880    where
881        T: 'static,
882    {
883        self.as_context_mut().spawn(task)
884    }
885}
886
887impl<T> StoreContextMut<'_, T> {
888    /// Assert that all the relevant tables and queues in the concurrent state
889    /// for this store are empty.
890    ///
891    /// This is for sanity checking in integration tests
892    /// (e.g. `component-async-tests`) that the relevant state has been cleared
893    /// after each test concludes.  This should help us catch leaks, e.g. guest
894    /// tasks which haven't been deleted despite having completed and having
895    /// been dropped by their supertasks.
896    #[doc(hidden)]
897    pub fn assert_concurrent_state_empty(self) {
898        let store = self.0;
899        store
900            .store_data_mut()
901            .components
902            .assert_guest_tables_empty();
903        let state = store.concurrent_state_mut();
904        assert!(
905            state.table.get_mut().is_empty(),
906            "non-empty table: {:?}",
907            state.table.get_mut()
908        );
909        assert!(state.high_priority.is_empty());
910        assert!(state.low_priority.is_empty());
911        assert!(state.guest_thread.is_none());
912        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
913        assert!(
914            state
915                .instance_states
916                .iter()
917                .all(|(_, state)| state.pending.is_empty())
918        );
919        assert!(state.global_error_context_ref_counts.is_empty());
920    }
921
922    /// Spawn a background task to run as part of this instance's event loop.
923    ///
924    /// The task will receive an `&Accessor<U>` and run concurrently with
925    /// any other tasks in progress for the instance.
926    ///
927    /// Note that the task will only make progress if and when the event loop
928    /// for this instance is run.
929    ///
930    /// The returned [`SpawnHandle`] may be used to cancel the task.
931    pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
932    where
933        T: 'static,
934    {
935        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
936        self.spawn_with_accessor(accessor, task)
937    }
938
939    /// Internal implementation of `spawn` functions where a `store` is
940    /// available along with an `Accessor`.
941    fn spawn_with_accessor<D>(
942        self,
943        accessor: Accessor<T, D>,
944        task: impl AccessorTask<T, D, Result<()>>,
945    ) -> JoinHandle
946    where
947        T: 'static,
948        D: HasData + ?Sized,
949    {
950        // Create an "abortable future" here where internally the future will
951        // hook calls to poll and possibly spawn more background tasks on each
952        // iteration.
953        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
954        self.0
955            .concurrent_state_mut()
956            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
957        handle
958    }
959
960    /// Run the specified closure `fun` to completion as part of this store's
961    /// event loop.
962    ///
963    /// This will run `fun` as part of this store's event loop until it
964    /// yields a result.  `fun` is provided an [`Accessor`], which provides
965    /// controlled access to the store and its data.
966    ///
967    /// This function can be used to invoke [`Func::call_concurrent`] for
968    /// example within the async closure provided here.
969    ///
970    /// # Store-blocking behavior
971    ///
972    ///
973    ///
974    /// At this time there are certain situations in which the `Future` returned
975    /// by the `AsyncFnOnce` passed to this function will not be polled for an
976    /// extended period of time, despite one or more `Waker::wake` events having
977    /// occurred for the task to which it belongs.  This can manifest as the
978    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
979    /// the `Store` being held by e.g. a blocking host function, preventing the
980    /// `Future` from being polled. A canonical example of this is when the
981    /// `fun` provided to this function attempts to set a timeout for an
982    /// invocation of a wasm function. In this situation the async closure is
983    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
984    /// to elapse. At this time this setup will not always work and the timeout
985    /// may not reliably fire.
986    ///
987    /// This function will not block the current thread and as such is always
988    /// suitable to run in an `async` context, but the current implementation of
989    /// Wasmtime can lead to situations where a certain wasm computation is
990    /// required to make progress the closure to make progress. This is an
991    /// artifact of Wasmtime's historical implementation of `async` functions
992    /// and is the topic of [#11869] and [#11870]. In the timeout example from
993    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
994    /// progress for a readiness notification of (b) to get delivered.
995    ///
996    /// This effectively means that it's not possible to reliably perform a
997    /// "select" operation within the `fun` closure, which timeouts for example
998    /// are based on. Fixing this requires some relatively major refactoring
999    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1000    /// that is intended to be fixed one day. In the meantime it's recommended
1001    /// to apply timeouts or such to the entire `run_concurrent` call itself
1002    /// rather than internally.
1003    ///
1004    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1005    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1006    ///
1007    /// # Example
1008    ///
1009    /// ```
1010    /// # use {
1011    /// #   anyhow::{Result},
1012    /// #   wasmtime::{
1013    /// #     component::{ Component, Linker, Resource, ResourceTable},
1014    /// #     Config, Engine, Store
1015    /// #   },
1016    /// # };
1017    /// #
1018    /// # struct MyResource(u32);
1019    /// # struct Ctx { table: ResourceTable }
1020    /// #
1021    /// # async fn foo() -> Result<()> {
1022    /// # let mut config = Config::new();
1023    /// # let engine = Engine::new(&config)?;
1024    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1025    /// # let mut linker = Linker::new(&engine);
1026    /// # let component = Component::new(&engine, "")?;
1027    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1028    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1029    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1030    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1031    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1032    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1033    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1034    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1035    ///    Ok(())
1036    /// }).await??;
1037    /// # Ok(())
1038    /// # }
1039    /// ```
1040    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1041    where
1042        T: Send + 'static,
1043    {
1044        self.do_run_concurrent(fun, false).await
1045    }
1046
1047    pub(super) async fn run_concurrent_trap_on_idle<R>(
1048        self,
1049        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1050    ) -> Result<R>
1051    where
1052        T: Send + 'static,
1053    {
1054        self.do_run_concurrent(fun, true).await
1055    }
1056
1057    async fn do_run_concurrent<R>(
1058        mut self,
1059        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1060        trap_on_idle: bool,
1061    ) -> Result<R>
1062    where
1063        T: Send + 'static,
1064    {
1065        check_recursive_run();
1066        let token = StoreToken::new(self.as_context_mut());
1067
1068        struct Dropper<'a, T: 'static, V> {
1069            store: StoreContextMut<'a, T>,
1070            value: ManuallyDrop<V>,
1071        }
1072
1073        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1074            fn drop(&mut self) {
1075                tls::set(self.store.0, || {
1076                    // SAFETY: Here we drop the value without moving it for the
1077                    // first and only time -- per the contract for `Drop::drop`,
1078                    // this code won't run again, and the `value` field will no
1079                    // longer be accessible.
1080                    unsafe { ManuallyDrop::drop(&mut self.value) }
1081                });
1082            }
1083        }
1084
1085        let accessor = &Accessor::new(token);
1086        let dropper = &mut Dropper {
1087            store: self,
1088            value: ManuallyDrop::new(fun(accessor)),
1089        };
1090        // SAFETY: We never move `dropper` nor its `value` field.
1091        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1092
1093        dropper
1094            .store
1095            .as_context_mut()
1096            .poll_until(future, trap_on_idle)
1097            .await
1098    }
1099
1100    /// Run this store's event loop.
1101    ///
1102    /// The returned future will resolve when the specified future completes or,
1103    /// if `trap_on_idle` is true, when the event loop can't make further
1104    /// progress.
1105    async fn poll_until<R>(
1106        mut self,
1107        mut future: Pin<&mut impl Future<Output = R>>,
1108        trap_on_idle: bool,
1109    ) -> Result<R>
1110    where
1111        T: Send + 'static,
1112    {
1113        struct Reset<'a, T: 'static> {
1114            store: StoreContextMut<'a, T>,
1115            futures: Option<FuturesUnordered<HostTaskFuture>>,
1116        }
1117
1118        impl<'a, T> Drop for Reset<'a, T> {
1119            fn drop(&mut self) {
1120                if let Some(futures) = self.futures.take() {
1121                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1122                }
1123            }
1124        }
1125
1126        loop {
1127            // Take `ConcurrentState::futures` out of the store so we can poll
1128            // it while also safely giving any of the futures inside access to
1129            // `self`.
1130            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1131            let mut reset = Reset {
1132                store: self.as_context_mut(),
1133                futures,
1134            };
1135            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1136
1137            let result = future::poll_fn(|cx| {
1138                // First, poll the future we were passed as an argument and
1139                // return immediately if it's ready.
1140                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1141                    return Poll::Ready(Ok(Either::Left(value)));
1142                }
1143
1144                // Next, poll `ConcurrentState::futures` (which includes any
1145                // pending host tasks and/or background tasks), returning
1146                // immediately if one of them fails.
1147                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1148                    Poll::Ready(Some(output)) => {
1149                        match output {
1150                            Err(e) => return Poll::Ready(Err(e)),
1151                            Ok(()) => {}
1152                        }
1153                        Poll::Ready(true)
1154                    }
1155                    Poll::Ready(None) => Poll::Ready(false),
1156                    Poll::Pending => Poll::Pending,
1157                };
1158
1159                // Next, check the "high priority" work queue and return
1160                // immediately if it has at least one item.
1161                let state = reset.store.0.concurrent_state_mut();
1162                let ready = mem::take(&mut state.high_priority);
1163                let ready = if ready.is_empty() {
1164                    // Next, check the "low priority" work queue and return
1165                    // immediately if it has at least one item.
1166                    let ready = mem::take(&mut state.low_priority);
1167                    if ready.is_empty() {
1168                        return match next {
1169                            Poll::Ready(true) => {
1170                                // In this case, one of the futures in
1171                                // `ConcurrentState::futures` completed
1172                                // successfully, so we return now and continue
1173                                // the outer loop in case there is another one
1174                                // ready to complete.
1175                                Poll::Ready(Ok(Either::Right(Vec::new())))
1176                            }
1177                            Poll::Ready(false) => {
1178                                // Poll the future we were passed one last time
1179                                // in case one of `ConcurrentState::futures` had
1180                                // the side effect of unblocking it.
1181                                if let Poll::Ready(value) =
1182                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1183                                {
1184                                    Poll::Ready(Ok(Either::Left(value)))
1185                                } else {
1186                                    // In this case, there are no more pending
1187                                    // futures in `ConcurrentState::futures`,
1188                                    // there are no remaining work items, _and_
1189                                    // the future we were passed as an argument
1190                                    // still hasn't completed.
1191                                    if trap_on_idle {
1192                                        // `trap_on_idle` is true, so we exit
1193                                        // immediately.
1194                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1195                                    } else {
1196                                        // `trap_on_idle` is false, so we assume
1197                                        // that future will wake up and give us
1198                                        // more work to do when it's ready to.
1199                                        Poll::Pending
1200                                    }
1201                                }
1202                            }
1203                            // There is at least one pending future in
1204                            // `ConcurrentState::futures` and we have nothing
1205                            // else to do but wait for now, so we return
1206                            // `Pending`.
1207                            Poll::Pending => Poll::Pending,
1208                        };
1209                    } else {
1210                        ready
1211                    }
1212                } else {
1213                    ready
1214                };
1215
1216                Poll::Ready(Ok(Either::Right(ready)))
1217            })
1218            .await;
1219
1220            // Put the `ConcurrentState::futures` back into the store before we
1221            // return or handle any work items since one or more of those items
1222            // might append more futures.
1223            drop(reset);
1224
1225            match result? {
1226                // The future we were passed as an argument completed, so we
1227                // return the result.
1228                Either::Left(value) => break Ok(value),
1229                // The future we were passed has not yet completed, so handle
1230                // any work items and then loop again.
1231                Either::Right(ready) => {
1232                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1233                        store: StoreContextMut<'a, T>,
1234                        ready: I,
1235                    }
1236
1237                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1238                        fn drop(&mut self) {
1239                            while let Some(item) = self.ready.next() {
1240                                match item {
1241                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1242                                    WorkItem::PushFuture(future) => {
1243                                        tls::set(self.store.0, move || drop(future))
1244                                    }
1245                                    _ => {}
1246                                }
1247                            }
1248                        }
1249                    }
1250
1251                    let mut dispose = Dispose {
1252                        store: self.as_context_mut(),
1253                        ready: ready.into_iter(),
1254                    };
1255
1256                    while let Some(item) = dispose.ready.next() {
1257                        dispose
1258                            .store
1259                            .as_context_mut()
1260                            .handle_work_item(item)
1261                            .await?;
1262                    }
1263                }
1264            }
1265        }
1266    }
1267
1268    /// Handle the specified work item, possibly resuming a fiber if applicable.
1269    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1270    where
1271        T: Send,
1272    {
1273        log::trace!("handle work item {item:?}");
1274        match item {
1275            WorkItem::PushFuture(future) => {
1276                self.0
1277                    .concurrent_state_mut()
1278                    .futures
1279                    .get_mut()
1280                    .as_mut()
1281                    .unwrap()
1282                    .push(future.into_inner());
1283            }
1284            WorkItem::ResumeFiber(fiber) => {
1285                self.0.resume_fiber(fiber).await?;
1286            }
1287            WorkItem::GuestCall(call) => {
1288                let state = self.0.concurrent_state_mut();
1289                if call.is_ready(state)? {
1290                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1291                } else {
1292                    let task = state.get_mut(call.thread.task)?;
1293                    if !task.starting_sent {
1294                        task.starting_sent = true;
1295                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1296                            Waitable::Guest(call.thread.task).set_event(
1297                                state,
1298                                Some(Event::Subtask {
1299                                    status: Status::Starting,
1300                                }),
1301                            )?;
1302                        }
1303                    }
1304
1305                    let runtime_instance = state.get_mut(call.thread.task)?.instance;
1306                    state
1307                        .instance_state(runtime_instance)
1308                        .pending
1309                        .insert(call.thread, call.kind);
1310                }
1311            }
1312            WorkItem::Poll(params) => {
1313                let state = self.0.concurrent_state_mut();
1314                if state.get_mut(params.thread.task)?.event.is_some()
1315                    || !state.get_mut(params.set)?.ready.is_empty()
1316                {
1317                    // There's at least one event immediately available; deliver
1318                    // it to the guest ASAP.
1319                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1320                        thread: params.thread,
1321                        kind: GuestCallKind::DeliverEvent {
1322                            instance: params.instance,
1323                            set: Some(params.set),
1324                        },
1325                    }));
1326                } else {
1327                    // There are no events immediately available; deliver
1328                    // `Event::None` to the guest.
1329                    state.get_mut(params.thread.task)?.event = Some(Event::None);
1330                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1331                        thread: params.thread,
1332                        kind: GuestCallKind::DeliverEvent {
1333                            instance: params.instance,
1334                            set: Some(params.set),
1335                        },
1336                    }));
1337                }
1338            }
1339            WorkItem::WorkerFunction(fun) => {
1340                self.run_on_worker(WorkerItem::Function(fun)).await?;
1341            }
1342        }
1343
1344        Ok(())
1345    }
1346
1347    /// Execute the specified guest call on a worker fiber.
1348    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1349    where
1350        T: Send,
1351    {
1352        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1353            fiber
1354        } else {
1355            fiber::make_fiber(self.0, move |store| {
1356                loop {
1357                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1358                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1359                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1360                    }
1361
1362                    store.suspend(SuspendReason::NeedWork)?;
1363                }
1364            })?
1365        };
1366
1367        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1368        assert!(worker_item.is_none());
1369        *worker_item = Some(item);
1370
1371        self.0.resume_fiber(worker).await
1372    }
1373
1374    /// Wrap the specified host function in a future which will call it, passing
1375    /// it an `&Accessor<T>`.
1376    ///
1377    /// See the `Accessor` documentation for details.
1378    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1379    where
1380        T: 'static,
1381        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1382            + Send
1383            + Sync
1384            + 'static,
1385        R: Send + Sync + 'static,
1386    {
1387        let token = StoreToken::new(self);
1388        async move {
1389            let mut accessor = Accessor::new(token);
1390            closure(&mut accessor).await
1391        }
1392    }
1393}
1394
1395impl StoreOpaque {
1396    /// Resume the specified fiber, giving it exclusive access to the specified
1397    /// store.
1398    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1399        let old_thread = self.concurrent_state_mut().guest_thread;
1400        log::trace!("resume_fiber: save current thread {old_thread:?}");
1401
1402        let fiber = fiber::resolve_or_release(self, fiber).await?;
1403
1404        let state = self.concurrent_state_mut();
1405
1406        state.guest_thread = old_thread;
1407        if let Some(ref ot) = old_thread {
1408            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1409        }
1410        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1411
1412        if let Some(mut fiber) = fiber {
1413            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1414            // See the `SuspendReason` documentation for what each case means.
1415            match state.suspend_reason.take().unwrap() {
1416                SuspendReason::NeedWork => {
1417                    if state.worker.is_none() {
1418                        state.worker = Some(fiber);
1419                    } else {
1420                        fiber.dispose(self);
1421                    }
1422                }
1423                SuspendReason::Yielding { thread, .. } => {
1424                    state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1425                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1426                }
1427                SuspendReason::ExplicitlySuspending { thread, .. } => {
1428                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1429                }
1430                SuspendReason::Waiting { set, thread } => {
1431                    let old = state
1432                        .get_mut(set)?
1433                        .waiting
1434                        .insert(thread, WaitMode::Fiber(fiber));
1435                    assert!(old.is_none());
1436                }
1437            };
1438        } else {
1439            log::trace!("resume_fiber: fiber has exited");
1440        }
1441
1442        Ok(())
1443    }
1444
1445    /// Suspend the current fiber, storing the reason in
1446    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1447    /// it should be resumed.
1448    ///
1449    /// See the `SuspendReason` documentation for details.
1450    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1451        log::trace!("suspend fiber: {reason:?}");
1452
1453        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1454        // pop the call context which manages resource borrows before suspending
1455        // and then push it again once we've resumed.
1456        let task = match &reason {
1457            SuspendReason::Yielding { thread, .. }
1458            | SuspendReason::Waiting { thread, .. }
1459            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1460            SuspendReason::NeedWork => None,
1461        };
1462
1463        let old_guest_thread = if let Some(task) = task {
1464            self.maybe_pop_call_context(task)?;
1465            self.concurrent_state_mut().guest_thread
1466        } else {
1467            None
1468        };
1469
1470        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1471        assert!(suspend_reason.is_none());
1472        *suspend_reason = Some(reason);
1473
1474        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1475
1476        if let Some(task) = task {
1477            self.concurrent_state_mut().guest_thread = old_guest_thread;
1478            self.maybe_push_call_context(task)?;
1479        }
1480
1481        Ok(())
1482    }
1483
1484    /// Push the call context for managing resource borrows for the specified
1485    /// guest task if it has not yet either returned a result or cancelled
1486    /// itself.
1487    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1488        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1489
1490        if !task.returned_or_cancelled() {
1491            log::trace!("push call context for {guest_task:?}");
1492            let call_context = task.call_context.take().unwrap();
1493            self.component_resource_state().0.push(call_context);
1494        }
1495        Ok(())
1496    }
1497
1498    /// Pop the call context for managing resource borrows for the specified
1499    /// guest task if it has not yet either returned a result or cancelled
1500    /// itself.
1501    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1502        if !self
1503            .concurrent_state_mut()
1504            .get_mut(guest_task)?
1505            .returned_or_cancelled()
1506        {
1507            log::trace!("pop call context for {guest_task:?}");
1508            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1509            self.concurrent_state_mut()
1510                .get_mut(guest_task)?
1511                .call_context = call_context;
1512        }
1513        Ok(())
1514    }
1515
1516    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1517        let state = self.concurrent_state_mut();
1518        let caller = state.guest_thread.unwrap();
1519        let old_set = waitable.common(state)?.set;
1520        let set = state.get_mut(caller.task)?.sync_call_set;
1521        waitable.join(state, Some(set))?;
1522        self.suspend(SuspendReason::Waiting {
1523            set,
1524            thread: caller,
1525        })?;
1526        let state = self.concurrent_state_mut();
1527        waitable.join(state, old_set)
1528    }
1529}
1530
1531impl Instance {
1532    /// Get the next pending event for the specified task and (optional)
1533    /// waitable set, along with the waitable handle if applicable.
1534    fn get_event(
1535        self,
1536        store: &mut StoreOpaque,
1537        guest_task: TableId<GuestTask>,
1538        set: Option<TableId<WaitableSet>>,
1539        cancellable: bool,
1540    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1541        let state = store.concurrent_state_mut();
1542
1543        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1544            log::trace!("deliver event {event:?} to {guest_task:?}");
1545
1546            if cancellable || !matches!(event, Event::Cancelled) {
1547                return Ok(Some((event, None)));
1548            } else {
1549                state.get_mut(guest_task)?.event = Some(event);
1550            }
1551        }
1552
1553        Ok(
1554            if let Some((set, waitable)) = set
1555                .and_then(|set| {
1556                    state
1557                        .get_mut(set)
1558                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1559                        .transpose()
1560                })
1561                .transpose()?
1562            {
1563                let common = waitable.common(state)?;
1564                let handle = common.handle.unwrap();
1565                let event = common.event.take().unwrap();
1566
1567                log::trace!(
1568                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1569                );
1570
1571                waitable.on_delivery(self.id().get_mut(store), event);
1572
1573                Some((event, Some((waitable, handle))))
1574            } else {
1575                None
1576            },
1577        )
1578    }
1579
1580    /// Handle the `CallbackCode` returned from an async-lifted export or its
1581    /// callback.
1582    ///
1583    /// If `initial_call` is `true`, then the code was received from the
1584    /// async-lifted export; otherwise, it was received from its callback.
1585    fn handle_callback_code(
1586        self,
1587        store: &mut StoreOpaque,
1588        guest_thread: QualifiedThreadId,
1589        runtime_instance: RuntimeComponentInstanceIndex,
1590        code: u32,
1591        initial_call: bool,
1592    ) -> Result<()> {
1593        let (code, set) = unpack_callback_code(code);
1594
1595        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1596
1597        let state = store.concurrent_state_mut();
1598        if !state.get_mut(guest_thread.task)?.returned_or_cancelled() {
1599            if initial_call {
1600                // Notify any current or future waiters that this subtask has
1601                // started.
1602                Waitable::Guest(guest_thread.task).set_event(
1603                    state,
1604                    Some(Event::Subtask {
1605                        status: Status::Started,
1606                    }),
1607                )?;
1608            }
1609        }
1610
1611        let get_set = |store, handle| {
1612            if handle == 0 {
1613                bail!("invalid waitable-set handle");
1614            }
1615
1616            let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1617                .waitable_set_rep(handle)?;
1618
1619            Ok(TableId::<WaitableSet>::new(set))
1620        };
1621
1622        match code {
1623            callback_code::EXIT => {
1624                log::trace!("implicit thread {guest_thread:?} completed");
1625                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1626                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1627                if task.threads.is_empty() && !task.returned_or_cancelled() {
1628                    bail!(Trap::NoAsyncResult);
1629                }
1630                match &task.caller {
1631                    Caller::Host { .. } => {
1632                        if task.ready_to_delete() {
1633                            Waitable::Guest(guest_thread.task)
1634                                .delete_from(store.concurrent_state_mut())?;
1635                        }
1636                    }
1637                    Caller::Guest { .. } => {
1638                        task.exited = true;
1639                        task.callback = None;
1640                    }
1641                }
1642            }
1643            callback_code::YIELD => {
1644                // Push this thread onto the "low priority" queue so it runs after
1645                // any other threads have had a chance to run.
1646                let task = state.get_mut(guest_thread.task)?;
1647                assert!(task.event.is_none());
1648                task.event = Some(Event::None);
1649                state.push_low_priority(WorkItem::GuestCall(GuestCall {
1650                    thread: guest_thread,
1651                    kind: GuestCallKind::DeliverEvent {
1652                        instance: self,
1653                        set: None,
1654                    },
1655                }));
1656            }
1657            callback_code::WAIT | callback_code::POLL => {
1658                let set = get_set(store, set)?;
1659                let state = store.concurrent_state_mut();
1660
1661                if state.get_mut(guest_thread.task)?.event.is_some()
1662                    || !state.get_mut(set)?.ready.is_empty()
1663                {
1664                    // An event is immediately available; deliver it ASAP.
1665                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1666                        thread: guest_thread,
1667                        kind: GuestCallKind::DeliverEvent {
1668                            instance: self,
1669                            set: Some(set),
1670                        },
1671                    }));
1672                } else {
1673                    // No event is immediately available.
1674                    match code {
1675                        callback_code::POLL => {
1676                            // We're polling, so just yield and check whether an
1677                            // event has arrived after that.
1678                            state.push_low_priority(WorkItem::Poll(PollParams {
1679                                instance: self,
1680                                thread: guest_thread,
1681                                set,
1682                            }));
1683                        }
1684                        callback_code::WAIT => {
1685                            // We're waiting, so register to be woken up when an
1686                            // event is published for this waitable set.
1687                            //
1688                            // Here we also set `GuestTask::wake_on_cancel`
1689                            // which allows `subtask.cancel` to interrupt the
1690                            // wait.
1691                            let old = state
1692                                .get_mut(guest_thread.thread)?
1693                                .wake_on_cancel
1694                                .replace(set);
1695                            assert!(old.is_none());
1696                            let old = state
1697                                .get_mut(set)?
1698                                .waiting
1699                                .insert(guest_thread, WaitMode::Callback(self));
1700                            assert!(old.is_none());
1701                        }
1702                        _ => unreachable!(),
1703                    }
1704                }
1705            }
1706            _ => bail!("unsupported callback code: {code}"),
1707        }
1708
1709        Ok(())
1710    }
1711
1712    fn cleanup_thread(
1713        self,
1714        store: &mut StoreOpaque,
1715        guest_thread: QualifiedThreadId,
1716        runtime_instance: RuntimeComponentInstanceIndex,
1717    ) -> Result<()> {
1718        let guest_id = store
1719            .concurrent_state_mut()
1720            .get_mut(guest_thread.thread)?
1721            .instance_rep;
1722        self.id().get_mut(store).guest_tables().0[runtime_instance]
1723            .guest_thread_remove(guest_id.unwrap())?;
1724
1725        store.concurrent_state_mut().delete(guest_thread.thread)?;
1726        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1727        task.threads.remove(&guest_thread.thread);
1728        Ok(())
1729    }
1730
1731    /// Add the specified guest call to the "high priority" work item queue, to
1732    /// be started as soon as backpressure and/or reentrance rules allow.
1733    ///
1734    /// SAFETY: The raw pointer arguments must be valid references to guest
1735    /// functions (with the appropriate signatures) when the closures queued by
1736    /// this function are called.
1737    unsafe fn queue_call<T: 'static>(
1738        self,
1739        mut store: StoreContextMut<T>,
1740        guest_thread: QualifiedThreadId,
1741        callee: SendSyncPtr<VMFuncRef>,
1742        param_count: usize,
1743        result_count: usize,
1744        flags: Option<InstanceFlags>,
1745        async_: bool,
1746        callback: Option<SendSyncPtr<VMFuncRef>>,
1747        post_return: Option<SendSyncPtr<VMFuncRef>>,
1748    ) -> Result<()> {
1749        /// Return a closure which will call the specified function in the scope
1750        /// of the specified task.
1751        ///
1752        /// This will use `GuestTask::lower_params` to lower the parameters, but
1753        /// will not lift the result; instead, it returns a
1754        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1755        /// any, may be lifted.  Note that an async-lifted export will have
1756        /// returned its result using the `task.return` intrinsic (or not
1757        /// returned a result at all, in the case of `task.cancel`), in which
1758        /// case the "result" of this call will either be a callback code or
1759        /// nothing.
1760        ///
1761        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1762        /// the returned closure is called.
1763        unsafe fn make_call<T: 'static>(
1764            store: StoreContextMut<T>,
1765            guest_thread: QualifiedThreadId,
1766            callee: SendSyncPtr<VMFuncRef>,
1767            param_count: usize,
1768            result_count: usize,
1769            flags: Option<InstanceFlags>,
1770        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1771        + Send
1772        + Sync
1773        + 'static
1774        + use<T> {
1775            let token = StoreToken::new(store);
1776            move |store: &mut dyn VMStore| {
1777                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1778
1779                store
1780                    .concurrent_state_mut()
1781                    .get_mut(guest_thread.thread)?
1782                    .state = GuestThreadState::Running;
1783                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1784                let may_enter_after_call = task.call_post_return_automatically();
1785                let lower = task.lower_params.take().unwrap();
1786
1787                lower(store, &mut storage[..param_count])?;
1788
1789                let mut store = token.as_context_mut(store);
1790
1791                // SAFETY: Per the contract documented in `make_call's`
1792                // documentation, `callee` must be a valid pointer.
1793                unsafe {
1794                    if let Some(mut flags) = flags {
1795                        flags.set_may_enter(false);
1796                    }
1797                    crate::Func::call_unchecked_raw(
1798                        &mut store,
1799                        callee.as_non_null(),
1800                        NonNull::new(
1801                            &mut storage[..param_count.max(result_count)]
1802                                as *mut [MaybeUninit<ValRaw>] as _,
1803                        )
1804                        .unwrap(),
1805                    )?;
1806                    if let Some(mut flags) = flags {
1807                        flags.set_may_enter(may_enter_after_call);
1808                    }
1809                }
1810
1811                Ok(storage)
1812            }
1813        }
1814
1815        // SAFETY: Per the contract described in this function documentation,
1816        // the `callee` pointer which `call` closes over must be valid when
1817        // called by the closure we queue below.
1818        let call = unsafe {
1819            make_call(
1820                store.as_context_mut(),
1821                guest_thread,
1822                callee,
1823                param_count,
1824                result_count,
1825                flags,
1826            )
1827        };
1828
1829        let callee_instance = store
1830            .0
1831            .concurrent_state_mut()
1832            .get_mut(guest_thread.task)?
1833            .instance;
1834        let fun = if callback.is_some() {
1835            assert!(async_);
1836
1837            Box::new(move |store: &mut dyn VMStore| {
1838                self.add_guest_thread_to_instance_table(
1839                    guest_thread.thread,
1840                    store,
1841                    callee_instance,
1842                )?;
1843                let old_thread = store
1844                    .concurrent_state_mut()
1845                    .guest_thread
1846                    .replace(guest_thread);
1847                log::trace!(
1848                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1849                );
1850
1851                store.maybe_push_call_context(guest_thread.task)?;
1852
1853                store.concurrent_state_mut().enter_instance(callee_instance);
1854
1855                // SAFETY: See the documentation for `make_call` to review the
1856                // contract we must uphold for `call` here.
1857                //
1858                // Per the contract described in the `queue_call`
1859                // documentation, the `callee` pointer which `call` closes
1860                // over must be valid.
1861                let storage = call(store)?;
1862
1863                store
1864                    .concurrent_state_mut()
1865                    .exit_instance(callee_instance)?;
1866
1867                store.maybe_pop_call_context(guest_thread.task)?;
1868
1869                let state = store.concurrent_state_mut();
1870                state.guest_thread = old_thread;
1871                old_thread
1872                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1873                log::trace!("stackless call: restored {old_thread:?} as current thread");
1874
1875                // SAFETY: `wasmparser` will have validated that the callback
1876                // function returns a `i32` result.
1877                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1878
1879                self.handle_callback_code(store, guest_thread, callee_instance, code, true)?;
1880
1881                Ok(())
1882            }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1883        } else {
1884            let token = StoreToken::new(store.as_context_mut());
1885            Box::new(move |store: &mut dyn VMStore| {
1886                self.add_guest_thread_to_instance_table(
1887                    guest_thread.thread,
1888                    store,
1889                    callee_instance,
1890                )?;
1891                let old_thread = store
1892                    .concurrent_state_mut()
1893                    .guest_thread
1894                    .replace(guest_thread);
1895                log::trace!(
1896                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1897                );
1898                let mut flags = self.id().get(store).instance_flags(callee_instance);
1899
1900                store.maybe_push_call_context(guest_thread.task)?;
1901
1902                // Unless this is a callback-less (i.e. stackful)
1903                // async-lifted export, we need to record that the instance
1904                // cannot be entered until the call returns.
1905                if !async_ {
1906                    store.concurrent_state_mut().enter_instance(callee_instance);
1907                }
1908
1909                // SAFETY: See the documentation for `make_call` to review the
1910                // contract we must uphold for `call` here.
1911                //
1912                // Per the contract described in the `queue_call`
1913                // documentation, the `callee` pointer which `call` closes
1914                // over must be valid.
1915                let storage = call(store)?;
1916
1917                // This is a callback-less call, so the implicit thread has now completed
1918                self.cleanup_thread(store, guest_thread, callee_instance)?;
1919
1920                if async_ {
1921                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1922                    if task.threads.is_empty() && !task.returned_or_cancelled() {
1923                        bail!(Trap::NoAsyncResult);
1924                    }
1925                } else {
1926                    // This is a sync-lifted export, so now is when we lift the
1927                    // result, optionally call the post-return function, if any,
1928                    // and finally notify any current or future waiters that the
1929                    // subtask has returned.
1930
1931                    let lift = {
1932                        let state = store.concurrent_state_mut();
1933                        state.exit_instance(callee_instance)?;
1934
1935                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
1936
1937                        state
1938                            .get_mut(guest_thread.task)?
1939                            .lift_result
1940                            .take()
1941                            .unwrap()
1942                    };
1943
1944                    // SAFETY: `result_count` represents the number of core Wasm
1945                    // results returned, per `wasmparser`.
1946                    let result = (lift.lift)(store, unsafe {
1947                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1948                            &storage[..result_count],
1949                        )
1950                    })?;
1951
1952                    let post_return_arg = match result_count {
1953                        0 => ValRaw::i32(0),
1954                        // SAFETY: `result_count` represents the number of
1955                        // core Wasm results returned, per `wasmparser`.
1956                        1 => unsafe { storage[0].assume_init() },
1957                        _ => unreachable!(),
1958                    };
1959
1960                    if store
1961                        .concurrent_state_mut()
1962                        .get_mut(guest_thread.task)?
1963                        .call_post_return_automatically()
1964                    {
1965                        unsafe {
1966                            flags.set_may_leave(false);
1967                            flags.set_needs_post_return(false);
1968                        }
1969
1970                        if let Some(func) = post_return {
1971                            let mut store = token.as_context_mut(store);
1972
1973                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1974                            // either `wasmtime-cranelift`-generated fused adapter
1975                            // code or `component::Options`.  Per `wasmparser`
1976                            // post-return signature validation, we know it takes a
1977                            // single parameter.
1978                            unsafe {
1979                                crate::Func::call_unchecked_raw(
1980                                    &mut store,
1981                                    func.as_non_null(),
1982                                    slice::from_ref(&post_return_arg).into(),
1983                                )?;
1984                            }
1985                        }
1986
1987                        unsafe {
1988                            flags.set_may_leave(true);
1989                            flags.set_may_enter(true);
1990                        }
1991                    }
1992
1993                    self.task_complete(
1994                        store,
1995                        guest_thread.task,
1996                        result,
1997                        Status::Returned,
1998                        post_return_arg,
1999                    )?;
2000                }
2001
2002                store.maybe_pop_call_context(guest_thread.task)?;
2003
2004                let state = store.concurrent_state_mut();
2005                let task = state.get_mut(guest_thread.task)?;
2006
2007                match &task.caller {
2008                    Caller::Host { .. } => {
2009                        if task.ready_to_delete() {
2010                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2011                        }
2012                    }
2013                    Caller::Guest { .. } => {
2014                        task.exited = true;
2015                    }
2016                }
2017
2018                Ok(())
2019            })
2020        };
2021
2022        store
2023            .0
2024            .concurrent_state_mut()
2025            .push_high_priority(WorkItem::GuestCall(GuestCall {
2026                thread: guest_thread,
2027                kind: GuestCallKind::StartImplicit(fun),
2028            }));
2029
2030        Ok(())
2031    }
2032
2033    /// Prepare (but do not start) a guest->guest call.
2034    ///
2035    /// This is called from fused adapter code generated in
2036    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2037    /// are synthesized Wasm functions which move the parameters from the caller
2038    /// to the callee and the result from the callee to the caller,
2039    /// respectively.  The adapter will call `Self::start_call` immediately
2040    /// after calling this function.
2041    ///
2042    /// SAFETY: All the pointer arguments must be valid pointers to guest
2043    /// entities (and with the expected signatures for the function references
2044    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2045    unsafe fn prepare_call<T: 'static>(
2046        self,
2047        mut store: StoreContextMut<T>,
2048        start: *mut VMFuncRef,
2049        return_: *mut VMFuncRef,
2050        caller_instance: RuntimeComponentInstanceIndex,
2051        callee_instance: RuntimeComponentInstanceIndex,
2052        task_return_type: TypeTupleIndex,
2053        memory: *mut VMMemoryDefinition,
2054        string_encoding: u8,
2055        caller_info: CallerInfo,
2056    ) -> Result<()> {
2057        self.id().get(store.0).check_may_leave(caller_instance)?;
2058
2059        enum ResultInfo {
2060            Heap { results: u32 },
2061            Stack { result_count: u32 },
2062        }
2063
2064        let result_info = match &caller_info {
2065            CallerInfo::Async {
2066                has_result: true,
2067                params,
2068            } => ResultInfo::Heap {
2069                results: params.last().unwrap().get_u32(),
2070            },
2071            CallerInfo::Async {
2072                has_result: false, ..
2073            } => ResultInfo::Stack { result_count: 0 },
2074            CallerInfo::Sync {
2075                result_count,
2076                params,
2077            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2078                results: params.last().unwrap().get_u32(),
2079            },
2080            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2081                result_count: *result_count,
2082            },
2083        };
2084
2085        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2086
2087        // Create a new guest task for the call, closing over the `start` and
2088        // `return_` functions to lift the parameters and lower the result,
2089        // respectively.
2090        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2091        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2092        let token = StoreToken::new(store.as_context_mut());
2093        let state = store.0.concurrent_state_mut();
2094        let old_thread = state.guest_thread.take();
2095        let new_task = GuestTask::new(
2096            state,
2097            Box::new(move |store, dst| {
2098                let mut store = token.as_context_mut(store);
2099                assert!(dst.len() <= MAX_FLAT_PARAMS);
2100                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2101                let count = match caller_info {
2102                    // Async callers, if they have a result, use the last
2103                    // parameter as a return pointer so chop that off if
2104                    // relevant here.
2105                    CallerInfo::Async { params, has_result } => {
2106                        let params = &params[..params.len() - usize::from(has_result)];
2107                        for (param, src) in params.iter().zip(&mut src) {
2108                            src.write(*param);
2109                        }
2110                        params.len()
2111                    }
2112
2113                    // Sync callers forward everything directly.
2114                    CallerInfo::Sync { params, .. } => {
2115                        for (param, src) in params.iter().zip(&mut src) {
2116                            src.write(*param);
2117                        }
2118                        params.len()
2119                    }
2120                };
2121                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2122                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2123                // how it was constructed (see
2124                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2125                // for details) we know it takes count parameters and returns
2126                // `dst.len()` results.
2127                unsafe {
2128                    crate::Func::call_unchecked_raw(
2129                        &mut store,
2130                        start.as_non_null(),
2131                        NonNull::new(
2132                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2133                        )
2134                        .unwrap(),
2135                    )?;
2136                }
2137                dst.copy_from_slice(&src[..dst.len()]);
2138                let state = store.0.concurrent_state_mut();
2139                Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2140                    state,
2141                    Some(Event::Subtask {
2142                        status: Status::Started,
2143                    }),
2144                )?;
2145                Ok(())
2146            }),
2147            LiftResult {
2148                lift: Box::new(move |store, src| {
2149                    // SAFETY: See comment in closure passed as `lower_params`
2150                    // parameter above.
2151                    let mut store = token.as_context_mut(store);
2152                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2153                    if let ResultInfo::Heap { results } = &result_info {
2154                        my_src.push(ValRaw::u32(*results));
2155                    }
2156                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2157                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2158                    // on how it was constructed (see
2159                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2160                    // for details) we know it takes `src.len()` parameters and
2161                    // returns up to 1 result.
2162                    unsafe {
2163                        crate::Func::call_unchecked_raw(
2164                            &mut store,
2165                            return_.as_non_null(),
2166                            my_src.as_mut_slice().into(),
2167                        )?;
2168                    }
2169                    let state = store.0.concurrent_state_mut();
2170                    let thread = state.guest_thread.unwrap();
2171                    if sync_caller {
2172                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2173                            if let ResultInfo::Stack { result_count } = &result_info {
2174                                match result_count {
2175                                    0 => None,
2176                                    1 => Some(my_src[0]),
2177                                    _ => unreachable!(),
2178                                }
2179                            } else {
2180                                None
2181                            },
2182                        );
2183                    }
2184                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2185                }),
2186                ty: task_return_type,
2187                memory: NonNull::new(memory).map(SendSyncPtr::new),
2188                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2189            },
2190            Caller::Guest {
2191                thread: old_thread.unwrap(),
2192                instance: caller_instance,
2193            },
2194            None,
2195            callee_instance,
2196        )?;
2197
2198        let guest_task = state.push(new_task)?;
2199        let new_thread = GuestThread::new_implicit(guest_task);
2200        let guest_thread = state.push(new_thread)?;
2201        state.get_mut(guest_task)?.threads.insert(guest_thread);
2202
2203        let state = store.0.concurrent_state_mut();
2204        if let Some(old_thread) = old_thread {
2205            if !state.may_enter(guest_task) {
2206                bail!(crate::Trap::CannotEnterComponent);
2207            }
2208
2209            state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2210        };
2211
2212        // Make the new thread the current one so that `Self::start_call` knows
2213        // which one to start.
2214        state.guest_thread = Some(QualifiedThreadId {
2215            task: guest_task,
2216            thread: guest_thread,
2217        });
2218        log::trace!(
2219            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2220        );
2221
2222        Ok(())
2223    }
2224
2225    /// Call the specified callback function for an async-lifted export.
2226    ///
2227    /// SAFETY: `function` must be a valid reference to a guest function of the
2228    /// correct signature for a callback.
2229    unsafe fn call_callback<T>(
2230        self,
2231        mut store: StoreContextMut<T>,
2232        callee_instance: RuntimeComponentInstanceIndex,
2233        function: SendSyncPtr<VMFuncRef>,
2234        event: Event,
2235        handle: u32,
2236        may_enter_after_call: bool,
2237    ) -> Result<u32> {
2238        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2239
2240        let (ordinal, result) = event.parts();
2241        let params = &mut [
2242            ValRaw::u32(ordinal),
2243            ValRaw::u32(handle),
2244            ValRaw::u32(result),
2245        ];
2246        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2247        // `wasmtime-cranelift`-generated fused adapter code or
2248        // `component::Options`.  Per `wasmparser` callback signature
2249        // validation, we know it takes three parameters and returns one.
2250        unsafe {
2251            flags.set_may_enter(false);
2252            crate::Func::call_unchecked_raw(
2253                &mut store,
2254                function.as_non_null(),
2255                params.as_mut_slice().into(),
2256            )?;
2257            flags.set_may_enter(may_enter_after_call);
2258        }
2259        Ok(params[0].get_u32())
2260    }
2261
2262    /// Start a guest->guest call previously prepared using
2263    /// `Self::prepare_call`.
2264    ///
2265    /// This is called from fused adapter code generated in
2266    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2267    /// this function immediately after calling `Self::prepare_call`.
2268    ///
2269    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2270    /// functions with the appropriate signatures for the current guest task.
2271    /// If this is a call to an async-lowered import, the actual call may be
2272    /// deferred and run after this function returns, in which case the pointer
2273    /// arguments must also be valid when the call happens.
2274    unsafe fn start_call<T: 'static>(
2275        self,
2276        mut store: StoreContextMut<T>,
2277        callback: *mut VMFuncRef,
2278        post_return: *mut VMFuncRef,
2279        callee: *mut VMFuncRef,
2280        param_count: u32,
2281        result_count: u32,
2282        flags: u32,
2283        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2284    ) -> Result<u32> {
2285        let token = StoreToken::new(store.as_context_mut());
2286        let async_caller = storage.is_none();
2287        let state = store.0.concurrent_state_mut();
2288        let guest_thread = state.guest_thread.unwrap();
2289        let may_enter_after_call = state
2290            .get_mut(guest_thread.task)?
2291            .call_post_return_automatically();
2292        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2293        let param_count = usize::try_from(param_count).unwrap();
2294        assert!(param_count <= MAX_FLAT_PARAMS);
2295        let result_count = usize::try_from(result_count).unwrap();
2296        assert!(result_count <= MAX_FLAT_RESULTS);
2297
2298        let task = state.get_mut(guest_thread.task)?;
2299        if !callback.is_null() {
2300            // We're calling an async-lifted export with a callback, so store
2301            // the callback and related context as part of the task so we can
2302            // call it later when needed.
2303            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2304            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2305                let store = token.as_context_mut(store);
2306                unsafe {
2307                    self.call_callback::<T>(
2308                        store,
2309                        runtime_instance,
2310                        callback,
2311                        event,
2312                        handle,
2313                        may_enter_after_call,
2314                    )
2315                }
2316            }));
2317        }
2318
2319        let Caller::Guest {
2320            thread: caller,
2321            instance: runtime_instance,
2322        } = &task.caller
2323        else {
2324            // As of this writing, `start_call` is only used for guest->guest
2325            // calls.
2326            unreachable!()
2327        };
2328        let caller = *caller;
2329        let caller_instance = *runtime_instance;
2330
2331        let callee_instance = task.instance;
2332
2333        let instance_flags = if callback.is_null() {
2334            None
2335        } else {
2336            Some(self.id().get(store.0).instance_flags(callee_instance))
2337        };
2338
2339        // Queue the call as a "high priority" work item.
2340        unsafe {
2341            self.queue_call(
2342                store.as_context_mut(),
2343                guest_thread,
2344                callee,
2345                param_count,
2346                result_count,
2347                instance_flags,
2348                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2349                NonNull::new(callback).map(SendSyncPtr::new),
2350                NonNull::new(post_return).map(SendSyncPtr::new),
2351            )?;
2352        }
2353
2354        let state = store.0.concurrent_state_mut();
2355
2356        // Use the caller's `GuestTask::sync_call_set` to register interest in
2357        // the subtask...
2358        let guest_waitable = Waitable::Guest(guest_thread.task);
2359        let old_set = guest_waitable.common(state)?.set;
2360        let set = state.get_mut(caller.task)?.sync_call_set;
2361        guest_waitable.join(state, Some(set))?;
2362
2363        // ... and suspend this fiber temporarily while we wait for it to start.
2364        //
2365        // Note that we _could_ call the callee directly using the current fiber
2366        // rather than suspend this one, but that would make reasoning about the
2367        // event loop more complicated and is probably only worth doing if
2368        // there's a measurable performance benefit.  In addition, it would mean
2369        // blocking the caller if the callee calls a blocking sync-lowered
2370        // import, and as of this writing the spec says we must not do that.
2371        //
2372        // Alternatively, the fused adapter code could be modified to call the
2373        // callee directly without calling a host-provided intrinsic at all (in
2374        // which case it would need to do its own, inline backpressure checks,
2375        // etc.).  Again, we'd want to see a measurable performance benefit
2376        // before committing to such an optimization.  And again, we'd need to
2377        // update the spec to allow that.
2378        let (status, waitable) = loop {
2379            store.0.suspend(SuspendReason::Waiting {
2380                set,
2381                thread: caller,
2382            })?;
2383
2384            let state = store.0.concurrent_state_mut();
2385
2386            log::trace!("taking event for {:?}", guest_thread.task);
2387            let event = guest_waitable.take_event(state)?;
2388            let Some(Event::Subtask { status }) = event else {
2389                unreachable!();
2390            };
2391
2392            log::trace!("status {status:?} for {:?}", guest_thread.task);
2393
2394            if status == Status::Returned {
2395                // It returned, so we can stop waiting.
2396                break (status, None);
2397            } else if async_caller {
2398                // It hasn't returned yet, but the caller is calling via an
2399                // async-lowered import, so we generate a handle for the task
2400                // waitable and return the status.
2401                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2402                    .subtask_insert_guest(guest_thread.task.rep())?;
2403                store
2404                    .0
2405                    .concurrent_state_mut()
2406                    .get_mut(guest_thread.task)?
2407                    .common
2408                    .handle = Some(handle);
2409                break (status, Some(handle));
2410            } else {
2411                // The callee hasn't returned yet, and the caller is calling via
2412                // a sync-lowered import, so we loop and keep waiting until the
2413                // callee returns.
2414            }
2415        };
2416
2417        let state = store.0.concurrent_state_mut();
2418
2419        guest_waitable.join(state, old_set)?;
2420
2421        if let Some(storage) = storage {
2422            // The caller used a sync-lowered import to call an async-lifted
2423            // export, in which case the result, if any, has been stashed in
2424            // `GuestTask::sync_result`.
2425            let task = state.get_mut(guest_thread.task)?;
2426            if let Some(result) = task.sync_result.take() {
2427                if let Some(result) = result {
2428                    storage[0] = MaybeUninit::new(result);
2429                }
2430
2431                if task.exited {
2432                    if task.ready_to_delete() {
2433                        Waitable::Guest(guest_thread.task).delete_from(state)?;
2434                    }
2435                }
2436            }
2437        }
2438
2439        // Reset the current thread to point to the caller as it resumes control.
2440        state.guest_thread = Some(caller);
2441        state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2442        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2443
2444        Ok(status.pack(waitable))
2445    }
2446
2447    /// Poll the specified future once on behalf of a guest->host call using an
2448    /// async-lowered import.
2449    ///
2450    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2451    /// `Pending`, add it to the set of futures to be polled as part of this
2452    /// instance's event loop until it completes, and then return
2453    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2454    ///
2455    /// Whether the future returns `Ready` immediately or later, the `lower`
2456    /// function will be used to lower the result, if any, into the guest caller's
2457    /// stack and linear memory unless the task has been cancelled.
2458    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2459        self,
2460        mut store: StoreContextMut<T>,
2461        future: impl Future<Output = Result<R>> + Send + 'static,
2462        caller_instance: RuntimeComponentInstanceIndex,
2463        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2464    ) -> Result<Option<u32>> {
2465        let token = StoreToken::new(store.as_context_mut());
2466        let state = store.0.concurrent_state_mut();
2467        let caller = state.guest_thread.unwrap();
2468
2469        // Create an abortable future which hooks calls to poll and manages call
2470        // context state for the future.
2471        let (join_handle, future) = JoinHandle::run(async move {
2472            let mut future = pin!(future);
2473            let mut call_context = None;
2474            future::poll_fn(move |cx| {
2475                // Push the call context for managing any resource borrows
2476                // for the task.
2477                tls::get(|store| {
2478                    if let Some(call_context) = call_context.take() {
2479                        token
2480                            .as_context_mut(store)
2481                            .0
2482                            .component_resource_state()
2483                            .0
2484                            .push(call_context);
2485                    }
2486                });
2487
2488                let result = future.as_mut().poll(cx);
2489
2490                if result.is_pending() {
2491                    // Pop the call context for managing any resource
2492                    // borrows for the task.
2493                    tls::get(|store| {
2494                        call_context = Some(
2495                            token
2496                                .as_context_mut(store)
2497                                .0
2498                                .component_resource_state()
2499                                .0
2500                                .pop()
2501                                .unwrap(),
2502                        );
2503                    });
2504                }
2505                result
2506            })
2507            .await
2508        });
2509
2510        // We create a new host task even though it might complete immediately
2511        // (in which case we won't need to pass a waitable back to the guest).
2512        // If it does complete immediately, we'll remove it before we return.
2513        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2514
2515        log::trace!("new host task child of {caller:?}: {task:?}");
2516
2517        let mut future = Box::pin(future);
2518
2519        // Finally, poll the future.  We can use a dummy `Waker` here because
2520        // we'll add the future to `ConcurrentState::futures` and poll it
2521        // automatically from the event loop if it doesn't complete immediately
2522        // here.
2523        let poll = tls::set(store.0, || {
2524            future
2525                .as_mut()
2526                .poll(&mut Context::from_waker(&Waker::noop()))
2527        });
2528
2529        Ok(match poll {
2530            Poll::Ready(None) => unreachable!(),
2531            Poll::Ready(Some(result)) => {
2532                // It finished immediately; lower the result and delete the
2533                // task.
2534                lower(store.as_context_mut(), result?)?;
2535                log::trace!("delete host task {task:?} (already ready)");
2536                store.0.concurrent_state_mut().delete(task)?;
2537                None
2538            }
2539            Poll::Pending => {
2540                // It hasn't finished yet; add the future to
2541                // `ConcurrentState::futures` so it will be polled by the event
2542                // loop and allocate a waitable handle to return to the guest.
2543
2544                // Wrap the future in a closure responsible for lowering the result into
2545                // the guest's stack and memory, as well as notifying any waiters that
2546                // the task returned.
2547                let future =
2548                    Box::pin(async move {
2549                        let result = match future.await {
2550                            Some(result) => result?,
2551                            // Task was cancelled; nothing left to do.
2552                            None => return Ok(()),
2553                        };
2554                        tls::get(move |store| {
2555                            // Here we schedule a task to run on a worker fiber to do
2556                            // the lowering since it may involve a call to the guest's
2557                            // realloc function.  This is necessary because calling the
2558                            // guest while there are host embedder frames on the stack
2559                            // is unsound.
2560                            store.concurrent_state_mut().push_high_priority(
2561                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2562                                    lower(token.as_context_mut(store), result)?;
2563                                    let state = store.concurrent_state_mut();
2564                                    state.get_mut(task)?.join_handle.take();
2565                                    Waitable::Host(task).set_event(
2566                                        state,
2567                                        Some(Event::Subtask {
2568                                            status: Status::Returned,
2569                                        }),
2570                                    )
2571                                }))),
2572                            );
2573                            Ok(())
2574                        })
2575                    });
2576
2577                store.0.concurrent_state_mut().push_future(future);
2578                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2579                    .subtask_insert_host(task.rep())?;
2580                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2581                log::trace!(
2582                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2583                );
2584                Some(handle)
2585            }
2586        })
2587    }
2588
2589    /// Implements the `task.return` intrinsic, lifting the result for the
2590    /// current guest task.
2591    pub(crate) fn task_return(
2592        self,
2593        store: &mut dyn VMStore,
2594        caller: RuntimeComponentInstanceIndex,
2595        ty: TypeTupleIndex,
2596        options: OptionsIndex,
2597        storage: &[ValRaw],
2598    ) -> Result<()> {
2599        self.id().get(store).check_may_leave(caller)?;
2600        let state = store.concurrent_state_mut();
2601        let guest_thread = state.guest_thread.unwrap();
2602        let lift = state
2603            .get_mut(guest_thread.task)?
2604            .lift_result
2605            .take()
2606            .ok_or_else(|| {
2607                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2608            })?;
2609        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2610
2611        let CanonicalOptions {
2612            string_encoding,
2613            data_model,
2614            ..
2615        } = &self.id().get(store).component().env_component().options[options];
2616
2617        let invalid = ty != lift.ty
2618            || string_encoding != &lift.string_encoding
2619            || match data_model {
2620                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2621                    Some(memory) => {
2622                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2623                        let actual = self.id().get(store).runtime_memory(memory);
2624                        expected != actual
2625                    }
2626                    // Memory not specified, meaning it didn't need to be
2627                    // specified per validation, so not invalid.
2628                    None => false,
2629                },
2630                // Always invalid as this isn't supported.
2631                CanonicalOptionsDataModel::Gc { .. } => true,
2632            };
2633
2634        if invalid {
2635            bail!("invalid `task.return` signature and/or options for current task");
2636        }
2637
2638        log::trace!("task.return for {guest_thread:?}");
2639
2640        let result = (lift.lift)(store, storage)?;
2641        self.task_complete(
2642            store,
2643            guest_thread.task,
2644            result,
2645            Status::Returned,
2646            ValRaw::i32(0),
2647        )
2648    }
2649
2650    /// Implements the `task.cancel` intrinsic.
2651    pub(crate) fn task_cancel(
2652        self,
2653        store: &mut StoreOpaque,
2654        caller: RuntimeComponentInstanceIndex,
2655    ) -> Result<()> {
2656        self.id().get(store).check_may_leave(caller)?;
2657        let state = store.concurrent_state_mut();
2658        let guest_thread = state.guest_thread.unwrap();
2659        let task = state.get_mut(guest_thread.task)?;
2660        if !task.cancel_sent {
2661            bail!("`task.cancel` called by task which has not been cancelled")
2662        }
2663        _ = task.lift_result.take().ok_or_else(|| {
2664            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2665        })?;
2666
2667        assert!(task.result.is_none());
2668
2669        log::trace!("task.cancel for {guest_thread:?}");
2670
2671        self.task_complete(
2672            store,
2673            guest_thread.task,
2674            Box::new(DummyResult),
2675            Status::ReturnCancelled,
2676            ValRaw::i32(0),
2677        )
2678    }
2679
2680    /// Complete the specified guest task (i.e. indicate that it has either
2681    /// returned a (possibly empty) result or cancelled itself).
2682    ///
2683    /// This will return any resource borrows and notify any current or future
2684    /// waiters that the task has completed.
2685    fn task_complete(
2686        self,
2687        store: &mut StoreOpaque,
2688        guest_task: TableId<GuestTask>,
2689        result: Box<dyn Any + Send + Sync>,
2690        status: Status,
2691        post_return_arg: ValRaw,
2692    ) -> Result<()> {
2693        if store
2694            .concurrent_state_mut()
2695            .get_mut(guest_task)?
2696            .call_post_return_automatically()
2697        {
2698            let (calls, host_table, _, instance) =
2699                store.component_resource_state_with_instance(self);
2700            ResourceTables {
2701                calls,
2702                host_table: Some(host_table),
2703                guest: Some(instance.guest_tables()),
2704            }
2705            .exit_call()?;
2706        } else {
2707            // As of this writing, the only scenario where `call_post_return_automatically`
2708            // would be false for a `GuestTask` is for host-to-guest calls using
2709            // `[Typed]Func::call_async`, in which case the `function_index`
2710            // should be a non-`None` value.
2711            let function_index = store
2712                .concurrent_state_mut()
2713                .get_mut(guest_task)?
2714                .function_index
2715                .unwrap();
2716            self.id()
2717                .get_mut(store)
2718                .post_return_arg_set(function_index, post_return_arg);
2719        }
2720
2721        let state = store.concurrent_state_mut();
2722        let task = state.get_mut(guest_task)?;
2723
2724        if let Caller::Host { tx, .. } = &mut task.caller {
2725            if let Some(tx) = tx.take() {
2726                _ = tx.send(result);
2727            }
2728        } else {
2729            task.result = Some(result);
2730            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2731        }
2732
2733        Ok(())
2734    }
2735
2736    /// Implements the `waitable-set.new` intrinsic.
2737    pub(crate) fn waitable_set_new(
2738        self,
2739        store: &mut StoreOpaque,
2740        caller_instance: RuntimeComponentInstanceIndex,
2741    ) -> Result<u32> {
2742        self.id().get_mut(store).check_may_leave(caller_instance)?;
2743        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2744        let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2745            .waitable_set_insert(set.rep())?;
2746        log::trace!("new waitable set {set:?} (handle {handle})");
2747        Ok(handle)
2748    }
2749
2750    /// Implements the `waitable-set.drop` intrinsic.
2751    pub(crate) fn waitable_set_drop(
2752        self,
2753        store: &mut StoreOpaque,
2754        caller_instance: RuntimeComponentInstanceIndex,
2755        set: u32,
2756    ) -> Result<()> {
2757        self.id().get_mut(store).check_may_leave(caller_instance)?;
2758        let rep =
2759            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2760
2761        log::trace!("drop waitable set {rep} (handle {set})");
2762
2763        let set = store
2764            .concurrent_state_mut()
2765            .delete(TableId::<WaitableSet>::new(rep))?;
2766
2767        if !set.waiting.is_empty() {
2768            bail!("cannot drop waitable set with waiters");
2769        }
2770
2771        Ok(())
2772    }
2773
2774    /// Implements the `waitable.join` intrinsic.
2775    pub(crate) fn waitable_join(
2776        self,
2777        store: &mut StoreOpaque,
2778        caller_instance: RuntimeComponentInstanceIndex,
2779        waitable_handle: u32,
2780        set_handle: u32,
2781    ) -> Result<()> {
2782        let mut instance = self.id().get_mut(store);
2783        instance.check_may_leave(caller_instance)?;
2784        let waitable =
2785            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2786
2787        let set = if set_handle == 0 {
2788            None
2789        } else {
2790            let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2791
2792            Some(TableId::<WaitableSet>::new(set))
2793        };
2794
2795        log::trace!(
2796            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2797        );
2798
2799        waitable.join(store.concurrent_state_mut(), set)
2800    }
2801
2802    /// Implements the `subtask.drop` intrinsic.
2803    pub(crate) fn subtask_drop(
2804        self,
2805        store: &mut StoreOpaque,
2806        caller_instance: RuntimeComponentInstanceIndex,
2807        task_id: u32,
2808    ) -> Result<()> {
2809        self.id().get_mut(store).check_may_leave(caller_instance)?;
2810        self.waitable_join(store, caller_instance, task_id, 0)?;
2811
2812        let (rep, is_host) =
2813            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2814
2815        let concurrent_state = store.concurrent_state_mut();
2816        let (waitable, expected_caller_instance, delete) = if is_host {
2817            let id = TableId::<HostTask>::new(rep);
2818            let task = concurrent_state.get_mut(id)?;
2819            if task.join_handle.is_some() {
2820                bail!("cannot drop a subtask which has not yet resolved");
2821            }
2822            (Waitable::Host(id), task.caller_instance, true)
2823        } else {
2824            let id = TableId::<GuestTask>::new(rep);
2825            let task = concurrent_state.get_mut(id)?;
2826            if task.lift_result.is_some() {
2827                bail!("cannot drop a subtask which has not yet resolved");
2828            }
2829            if let Caller::Guest { instance, .. } = &task.caller {
2830                (Waitable::Guest(id), *instance, task.exited)
2831            } else {
2832                unreachable!()
2833            }
2834        };
2835
2836        waitable.common(concurrent_state)?.handle = None;
2837
2838        if waitable.take_event(concurrent_state)?.is_some() {
2839            bail!("cannot drop a subtask with an undelivered event");
2840        }
2841
2842        if delete {
2843            waitable.delete_from(concurrent_state)?;
2844        }
2845
2846        // Since waitables can neither be passed between instances nor forged,
2847        // this should never fail unless there's a bug in Wasmtime, but we check
2848        // here to be sure:
2849        assert_eq!(expected_caller_instance, caller_instance);
2850        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2851        Ok(())
2852    }
2853
2854    /// Implements the `waitable-set.wait` intrinsic.
2855    pub(crate) fn waitable_set_wait(
2856        self,
2857        store: &mut StoreOpaque,
2858        caller: RuntimeComponentInstanceIndex,
2859        options: OptionsIndex,
2860        set: u32,
2861        payload: u32,
2862    ) -> Result<u32> {
2863        self.id().get(store).check_may_leave(caller)?;
2864        let &CanonicalOptions {
2865            cancellable,
2866            instance: caller_instance,
2867            ..
2868        } = &self.id().get(store).component().env_component().options[options];
2869        let rep =
2870            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2871
2872        self.waitable_check(
2873            store,
2874            cancellable,
2875            WaitableCheck::Wait(WaitableCheckParams {
2876                set: TableId::new(rep),
2877                options,
2878                payload,
2879            }),
2880        )
2881    }
2882
2883    /// Implements the `waitable-set.poll` intrinsic.
2884    pub(crate) fn waitable_set_poll(
2885        self,
2886        store: &mut StoreOpaque,
2887        caller: RuntimeComponentInstanceIndex,
2888        options: OptionsIndex,
2889        set: u32,
2890        payload: u32,
2891    ) -> Result<u32> {
2892        self.id().get(store).check_may_leave(caller)?;
2893        let &CanonicalOptions {
2894            cancellable,
2895            instance: caller_instance,
2896            ..
2897        } = &self.id().get(store).component().env_component().options[options];
2898        let rep =
2899            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2900
2901        self.waitable_check(
2902            store,
2903            cancellable,
2904            WaitableCheck::Poll(WaitableCheckParams {
2905                set: TableId::new(rep),
2906                options,
2907                payload,
2908            }),
2909        )
2910    }
2911
2912    /// Implements the `thread.index` intrinsic.
2913    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2914        let thread_id = store
2915            .concurrent_state_mut()
2916            .guest_thread
2917            .ok_or_else(|| anyhow!("no current thread"))?
2918            .thread;
2919        // The unwrap is safe because `instance_rep` must be `Some` by this point
2920        Ok(store
2921            .concurrent_state_mut()
2922            .get_mut(thread_id)?
2923            .instance_rep
2924            .unwrap())
2925    }
2926
2927    /// Implements the `thread.new_indirect` intrinsic.
2928    pub(crate) fn thread_new_indirect<T: 'static>(
2929        self,
2930        mut store: StoreContextMut<T>,
2931        runtime_instance: RuntimeComponentInstanceIndex,
2932        _func_ty_idx: TypeFuncIndex, // currently unused
2933        start_func_table_idx: RuntimeTableIndex,
2934        start_func_idx: u32,
2935        context: i32,
2936    ) -> Result<u32> {
2937        self.id().get(store.0).check_may_leave(runtime_instance)?;
2938
2939        log::trace!("creating new thread");
2940
2941        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
2942        let instance = self.id().get_mut(store.0);
2943        let callee = instance
2944            .index_runtime_func_table(start_func_table_idx, start_func_idx as u64)?
2945            .ok_or_else(|| {
2946                anyhow!("the start function index points to an uninitialized function")
2947            })?;
2948        if callee.type_index(store.0) != start_func_ty.type_index() {
2949            bail!(
2950                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
2951            );
2952        }
2953
2954        let token = StoreToken::new(store.as_context_mut());
2955        let start_func = Box::new(
2956            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
2957                let old_thread = store
2958                    .concurrent_state_mut()
2959                    .guest_thread
2960                    .replace(guest_thread);
2961                log::trace!(
2962                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
2963                );
2964
2965                store.maybe_push_call_context(guest_thread.task)?;
2966
2967                let mut store = token.as_context_mut(store);
2968                let mut params = [ValRaw::i32(context)];
2969                // Use call_unchecked rather than call or call_async, as we don't want to run the function
2970                // on a separate fiber if we're running in an async store.
2971                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
2972
2973                store.0.maybe_pop_call_context(guest_thread.task)?;
2974
2975                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
2976                log::trace!("explicit thread {guest_thread:?} completed");
2977                let state = store.0.concurrent_state_mut();
2978                let task = state.get_mut(guest_thread.task)?;
2979                if task.threads.is_empty() && !task.returned_or_cancelled() {
2980                    bail!(Trap::NoAsyncResult);
2981                }
2982                state.guest_thread = old_thread;
2983                old_thread
2984                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2985                if state.get_mut(guest_thread.task)?.ready_to_delete() {
2986                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2987                }
2988                log::trace!("thread start: restored {old_thread:?} as current thread");
2989
2990                Ok(())
2991            },
2992        );
2993
2994        let state = store.0.concurrent_state_mut();
2995        let current_thread = state.guest_thread.unwrap();
2996        let parent_task = current_thread.task;
2997
2998        let new_thread = GuestThread::new_explicit(parent_task, start_func);
2999        let thread_id = state.push(new_thread)?;
3000        state.get_mut(parent_task)?.threads.insert(thread_id);
3001
3002        log::trace!("new thread with id {thread_id:?} created");
3003
3004        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3005    }
3006
3007    pub(crate) fn resume_suspended_thread(
3008        self,
3009        store: &mut StoreOpaque,
3010        runtime_instance: RuntimeComponentInstanceIndex,
3011        thread_idx: u32,
3012        high_priority: bool,
3013    ) -> Result<()> {
3014        let thread_id =
3015            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3016        let state = store.concurrent_state_mut();
3017        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3018        let thread = state.get_mut(guest_thread.thread)?;
3019
3020        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3021            GuestThreadState::NotStartedExplicit(start_func) => {
3022                log::trace!("starting thread {guest_thread:?}");
3023                let guest_call = WorkItem::GuestCall(GuestCall {
3024                    thread: guest_thread,
3025                    kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3026                        start_func(store, guest_thread)
3027                    })),
3028                });
3029                store
3030                    .concurrent_state_mut()
3031                    .push_work_item(guest_call, high_priority);
3032            }
3033            GuestThreadState::Suspended(fiber) => {
3034                log::trace!("resuming thread {thread_id:?} that was suspended");
3035                store
3036                    .concurrent_state_mut()
3037                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3038            }
3039            _ => {
3040                bail!("cannot resume thread which is not suspended");
3041            }
3042        }
3043        Ok(())
3044    }
3045
3046    fn add_guest_thread_to_instance_table(
3047        self,
3048        thread_id: TableId<GuestThread>,
3049        store: &mut StoreOpaque,
3050        runtime_instance: RuntimeComponentInstanceIndex,
3051    ) -> Result<u32> {
3052        let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3053            .guest_thread_insert(thread_id.rep())?;
3054        store
3055            .concurrent_state_mut()
3056            .get_mut(thread_id)?
3057            .instance_rep = Some(guest_id);
3058        Ok(guest_id)
3059    }
3060
3061    /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3062    /// and `thread.switch-to` intrinsics.
3063    pub(crate) fn suspension_intrinsic(
3064        self,
3065        store: &mut StoreOpaque,
3066        caller: RuntimeComponentInstanceIndex,
3067        cancellable: bool,
3068        yielding: bool,
3069        to_thread: Option<u32>,
3070    ) -> Result<WaitResult> {
3071        // There could be a pending cancellation from a previous uncancellable wait
3072        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3073            return Ok(WaitResult::Cancelled);
3074        }
3075
3076        self.id().get(store).check_may_leave(caller)?;
3077
3078        if let Some(thread) = to_thread {
3079            self.resume_suspended_thread(store, caller, thread, true)?;
3080        }
3081
3082        let state = store.concurrent_state_mut();
3083        let guest_thread = state.guest_thread.unwrap();
3084        let reason = if yielding {
3085            SuspendReason::Yielding {
3086                thread: guest_thread,
3087            }
3088        } else {
3089            SuspendReason::ExplicitlySuspending {
3090                thread: guest_thread,
3091            }
3092        };
3093
3094        store.suspend(reason)?;
3095
3096        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3097            Ok(WaitResult::Cancelled)
3098        } else {
3099            Ok(WaitResult::Completed)
3100        }
3101    }
3102
3103    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3104    fn waitable_check(
3105        self,
3106        store: &mut StoreOpaque,
3107        cancellable: bool,
3108        check: WaitableCheck,
3109    ) -> Result<u32> {
3110        let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3111
3112        let (wait, set) = match &check {
3113            WaitableCheck::Wait(params) => (true, Some(params.set)),
3114            WaitableCheck::Poll(params) => (false, Some(params.set)),
3115        };
3116
3117        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3118        // First, suspend this fiber, allowing any other threads to run.
3119        store.suspend(SuspendReason::Yielding {
3120            thread: guest_thread,
3121        })?;
3122
3123        log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3124
3125        let state = store.concurrent_state_mut();
3126        let task = state.get_mut(guest_thread.task)?;
3127
3128        // If we're waiting, and there are no events immediately available,
3129        // suspend the fiber until that changes.
3130        if wait {
3131            let set = set.unwrap();
3132
3133            if (task.event.is_none()
3134                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3135                && state.get_mut(set)?.ready.is_empty()
3136            {
3137                if cancellable {
3138                    let old = state
3139                        .get_mut(guest_thread.thread)?
3140                        .wake_on_cancel
3141                        .replace(set);
3142                    assert!(old.is_none());
3143                }
3144
3145                store.suspend(SuspendReason::Waiting {
3146                    set,
3147                    thread: guest_thread,
3148                })?;
3149            }
3150        }
3151
3152        log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3153
3154        let result = match check {
3155            // Deliver any pending events to the guest and return.
3156            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3157                let event =
3158                    self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3159
3160                let (ordinal, handle, result) = if wait {
3161                    let (event, waitable) = event.unwrap();
3162                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3163                    let (ordinal, result) = event.parts();
3164                    (ordinal, handle, result)
3165                } else {
3166                    if let Some((event, waitable)) = event {
3167                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3168                        let (ordinal, result) = event.parts();
3169                        (ordinal, handle, result)
3170                    } else {
3171                        log::trace!(
3172                            "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3173                            guest_thread.task,
3174                            params.set
3175                        );
3176                        let (ordinal, result) = Event::None.parts();
3177                        (ordinal, 0, result)
3178                    }
3179                };
3180                let options = Options::new_index(store, self, params.options);
3181                let ptr = func::validate_inbounds::<(u32, u32)>(
3182                    options.memory_mut(store),
3183                    &ValRaw::u32(params.payload),
3184                )?;
3185                options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3186                options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3187                Ok(ordinal)
3188            }
3189        };
3190
3191        result
3192    }
3193
3194    /// Implements the `subtask.cancel` intrinsic.
3195    pub(crate) fn subtask_cancel(
3196        self,
3197        store: &mut StoreOpaque,
3198        caller_instance: RuntimeComponentInstanceIndex,
3199        async_: bool,
3200        task_id: u32,
3201    ) -> Result<u32> {
3202        self.id().get(store).check_may_leave(caller_instance)?;
3203        let (rep, is_host) =
3204            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3205        let (waitable, expected_caller_instance) = if is_host {
3206            let id = TableId::<HostTask>::new(rep);
3207            (
3208                Waitable::Host(id),
3209                store.concurrent_state_mut().get_mut(id)?.caller_instance,
3210            )
3211        } else {
3212            let id = TableId::<GuestTask>::new(rep);
3213            if let Caller::Guest { instance, .. } =
3214                &store.concurrent_state_mut().get_mut(id)?.caller
3215            {
3216                (Waitable::Guest(id), *instance)
3217            } else {
3218                unreachable!()
3219            }
3220        };
3221        // Since waitables can neither be passed between instances nor forged,
3222        // this should never fail unless there's a bug in Wasmtime, but we check
3223        // here to be sure:
3224        assert_eq!(expected_caller_instance, caller_instance);
3225
3226        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3227
3228        let concurrent_state = store.concurrent_state_mut();
3229        if let Waitable::Host(host_task) = waitable {
3230            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3231                handle.abort();
3232                return Ok(Status::ReturnCancelled as u32);
3233            }
3234        } else {
3235            let caller = concurrent_state.guest_thread.unwrap();
3236            let guest_task = TableId::<GuestTask>::new(rep);
3237            let task = concurrent_state.get_mut(guest_task)?;
3238            if !task.already_lowered_parameters() {
3239                task.lower_params = None;
3240                task.lift_result = None;
3241
3242                // Not yet started; cancel and remove from pending
3243                let callee_instance = task.instance;
3244
3245                let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3246                let pending_count = pending.len();
3247                pending.retain(|thread, _| thread.task != guest_task);
3248                // If there were no pending threads for this task, we're in an error state
3249                if pending.len() == pending_count {
3250                    bail!("`subtask.cancel` called after terminal status delivered");
3251                }
3252                return Ok(Status::StartCancelled as u32);
3253            } else if !task.returned_or_cancelled() {
3254                // Started, but not yet returned or cancelled; send the
3255                // `CANCELLED` event
3256                task.cancel_sent = true;
3257                // Note that this might overwrite an event that was set earlier
3258                // (e.g. `Event::None` if the task is yielding, or
3259                // `Event::Cancelled` if it was already cancelled), but that's
3260                // okay -- this should supersede the previous state.
3261                task.event = Some(Event::Cancelled);
3262                for thread in task.threads.clone() {
3263                    let thread = QualifiedThreadId {
3264                        task: guest_task,
3265                        thread,
3266                    };
3267                    if let Some(set) = concurrent_state
3268                        .get_mut(thread.thread)
3269                        .unwrap()
3270                        .wake_on_cancel
3271                        .take()
3272                    {
3273                        let item = match concurrent_state
3274                            .get_mut(set)?
3275                            .waiting
3276                            .remove(&thread)
3277                            .unwrap()
3278                        {
3279                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3280                            WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3281                                thread,
3282                                kind: GuestCallKind::DeliverEvent {
3283                                    instance,
3284                                    set: None,
3285                                },
3286                            }),
3287                        };
3288                        concurrent_state.push_high_priority(item);
3289
3290                        store.suspend(SuspendReason::Yielding { thread: caller })?;
3291                        break;
3292                    }
3293                }
3294
3295                let concurrent_state = store.concurrent_state_mut();
3296                let task = concurrent_state.get_mut(guest_task)?;
3297                if !task.returned_or_cancelled() {
3298                    if async_ {
3299                        return Ok(BLOCKED);
3300                    } else {
3301                        store.wait_for_event(Waitable::Guest(guest_task))?;
3302                    }
3303                }
3304            }
3305        }
3306
3307        let event = waitable.take_event(store.concurrent_state_mut())?;
3308        if let Some(Event::Subtask {
3309            status: status @ (Status::Returned | Status::ReturnCancelled),
3310        }) = event
3311        {
3312            Ok(status as u32)
3313        } else {
3314            bail!("`subtask.cancel` called after terminal status delivered");
3315        }
3316    }
3317
3318    pub(crate) fn context_get(
3319        self,
3320        store: &mut StoreOpaque,
3321        caller: RuntimeComponentInstanceIndex,
3322        slot: u32,
3323    ) -> Result<u32> {
3324        self.id().get(store).check_may_leave(caller)?;
3325        store.concurrent_state_mut().context_get(slot)
3326    }
3327
3328    pub(crate) fn context_set(
3329        self,
3330        store: &mut StoreOpaque,
3331        caller: RuntimeComponentInstanceIndex,
3332        slot: u32,
3333        value: u32,
3334    ) -> Result<()> {
3335        self.id().get(store).check_may_leave(caller)?;
3336        store.concurrent_state_mut().context_set(slot, value)
3337    }
3338}
3339
3340/// Trait representing component model ABI async intrinsics and fused adapter
3341/// helper functions.
3342///
3343/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3344/// which must be valid for at least the duration of the call (and possibly for
3345/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3346/// pointers used for async calls).
3347pub trait VMComponentAsyncStore {
3348    /// A helper function for fused adapter modules involving calls where the
3349    /// one of the caller or callee is async.
3350    ///
3351    /// This helper is not used when the caller and callee both use the sync
3352    /// ABI, only when at least one is async is this used.
3353    unsafe fn prepare_call(
3354        &mut self,
3355        instance: Instance,
3356        memory: *mut VMMemoryDefinition,
3357        start: *mut VMFuncRef,
3358        return_: *mut VMFuncRef,
3359        caller_instance: RuntimeComponentInstanceIndex,
3360        callee_instance: RuntimeComponentInstanceIndex,
3361        task_return_type: TypeTupleIndex,
3362        string_encoding: u8,
3363        result_count: u32,
3364        storage: *mut ValRaw,
3365        storage_len: usize,
3366    ) -> Result<()>;
3367
3368    /// A helper function for fused adapter modules involving calls where the
3369    /// caller is sync-lowered but the callee is async-lifted.
3370    unsafe fn sync_start(
3371        &mut self,
3372        instance: Instance,
3373        callback: *mut VMFuncRef,
3374        callee: *mut VMFuncRef,
3375        param_count: u32,
3376        storage: *mut MaybeUninit<ValRaw>,
3377        storage_len: usize,
3378    ) -> Result<()>;
3379
3380    /// A helper function for fused adapter modules involving calls where the
3381    /// caller is async-lowered.
3382    unsafe fn async_start(
3383        &mut self,
3384        instance: Instance,
3385        callback: *mut VMFuncRef,
3386        post_return: *mut VMFuncRef,
3387        callee: *mut VMFuncRef,
3388        param_count: u32,
3389        result_count: u32,
3390        flags: u32,
3391    ) -> Result<u32>;
3392
3393    /// The `future.write` intrinsic.
3394    fn future_write(
3395        &mut self,
3396        instance: Instance,
3397        caller: RuntimeComponentInstanceIndex,
3398        ty: TypeFutureTableIndex,
3399        options: OptionsIndex,
3400        future: u32,
3401        address: u32,
3402    ) -> Result<u32>;
3403
3404    /// The `future.read` intrinsic.
3405    fn future_read(
3406        &mut self,
3407        instance: Instance,
3408        caller: RuntimeComponentInstanceIndex,
3409        ty: TypeFutureTableIndex,
3410        options: OptionsIndex,
3411        future: u32,
3412        address: u32,
3413    ) -> Result<u32>;
3414
3415    /// The `future.drop-writable` intrinsic.
3416    fn future_drop_writable(
3417        &mut self,
3418        instance: Instance,
3419        caller: RuntimeComponentInstanceIndex,
3420        ty: TypeFutureTableIndex,
3421        writer: u32,
3422    ) -> Result<()>;
3423
3424    /// The `stream.write` intrinsic.
3425    fn stream_write(
3426        &mut self,
3427        instance: Instance,
3428        caller: RuntimeComponentInstanceIndex,
3429        ty: TypeStreamTableIndex,
3430        options: OptionsIndex,
3431        stream: u32,
3432        address: u32,
3433        count: u32,
3434    ) -> Result<u32>;
3435
3436    /// The `stream.read` intrinsic.
3437    fn stream_read(
3438        &mut self,
3439        instance: Instance,
3440        caller: RuntimeComponentInstanceIndex,
3441        ty: TypeStreamTableIndex,
3442        options: OptionsIndex,
3443        stream: u32,
3444        address: u32,
3445        count: u32,
3446    ) -> Result<u32>;
3447
3448    /// The "fast-path" implementation of the `stream.write` intrinsic for
3449    /// "flat" (i.e. memcpy-able) payloads.
3450    fn flat_stream_write(
3451        &mut self,
3452        instance: Instance,
3453        caller: RuntimeComponentInstanceIndex,
3454        ty: TypeStreamTableIndex,
3455        options: OptionsIndex,
3456        payload_size: u32,
3457        payload_align: u32,
3458        stream: u32,
3459        address: u32,
3460        count: u32,
3461    ) -> Result<u32>;
3462
3463    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3464    /// (i.e. memcpy-able) payloads.
3465    fn flat_stream_read(
3466        &mut self,
3467        instance: Instance,
3468        caller: RuntimeComponentInstanceIndex,
3469        ty: TypeStreamTableIndex,
3470        options: OptionsIndex,
3471        payload_size: u32,
3472        payload_align: u32,
3473        stream: u32,
3474        address: u32,
3475        count: u32,
3476    ) -> Result<u32>;
3477
3478    /// The `stream.drop-writable` intrinsic.
3479    fn stream_drop_writable(
3480        &mut self,
3481        instance: Instance,
3482        caller: RuntimeComponentInstanceIndex,
3483        ty: TypeStreamTableIndex,
3484        writer: u32,
3485    ) -> Result<()>;
3486
3487    /// The `error-context.debug-message` intrinsic.
3488    fn error_context_debug_message(
3489        &mut self,
3490        instance: Instance,
3491        caller: RuntimeComponentInstanceIndex,
3492        ty: TypeComponentLocalErrorContextTableIndex,
3493        options: OptionsIndex,
3494        err_ctx_handle: u32,
3495        debug_msg_address: u32,
3496    ) -> Result<()>;
3497
3498    /// The `thread.new_indirect` intrinsic
3499    fn thread_new_indirect(
3500        &mut self,
3501        instance: Instance,
3502        caller: RuntimeComponentInstanceIndex,
3503        func_ty_idx: TypeFuncIndex,
3504        start_func_table_idx: RuntimeTableIndex,
3505        start_func_idx: u32,
3506        context: i32,
3507    ) -> Result<u32>;
3508}
3509
3510/// SAFETY: See trait docs.
3511impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3512    unsafe fn prepare_call(
3513        &mut self,
3514        instance: Instance,
3515        memory: *mut VMMemoryDefinition,
3516        start: *mut VMFuncRef,
3517        return_: *mut VMFuncRef,
3518        caller_instance: RuntimeComponentInstanceIndex,
3519        callee_instance: RuntimeComponentInstanceIndex,
3520        task_return_type: TypeTupleIndex,
3521        string_encoding: u8,
3522        result_count_or_max_if_async: u32,
3523        storage: *mut ValRaw,
3524        storage_len: usize,
3525    ) -> Result<()> {
3526        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3527        // this method will have ensured that `storage` is a valid
3528        // pointer containing at least `storage_len` items.
3529        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3530
3531        unsafe {
3532            instance.prepare_call(
3533                StoreContextMut(self),
3534                start,
3535                return_,
3536                caller_instance,
3537                callee_instance,
3538                task_return_type,
3539                memory,
3540                string_encoding,
3541                match result_count_or_max_if_async {
3542                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3543                        params,
3544                        has_result: false,
3545                    },
3546                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3547                        params,
3548                        has_result: true,
3549                    },
3550                    result_count => CallerInfo::Sync {
3551                        params,
3552                        result_count,
3553                    },
3554                },
3555            )
3556        }
3557    }
3558
3559    unsafe fn sync_start(
3560        &mut self,
3561        instance: Instance,
3562        callback: *mut VMFuncRef,
3563        callee: *mut VMFuncRef,
3564        param_count: u32,
3565        storage: *mut MaybeUninit<ValRaw>,
3566        storage_len: usize,
3567    ) -> Result<()> {
3568        unsafe {
3569            instance
3570                .start_call(
3571                    StoreContextMut(self),
3572                    callback,
3573                    ptr::null_mut(),
3574                    callee,
3575                    param_count,
3576                    1,
3577                    START_FLAG_ASYNC_CALLEE,
3578                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3579                    // this method will have ensured that `storage` is a valid
3580                    // pointer containing at least `storage_len` items.
3581                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3582                )
3583                .map(drop)
3584        }
3585    }
3586
3587    unsafe fn async_start(
3588        &mut self,
3589        instance: Instance,
3590        callback: *mut VMFuncRef,
3591        post_return: *mut VMFuncRef,
3592        callee: *mut VMFuncRef,
3593        param_count: u32,
3594        result_count: u32,
3595        flags: u32,
3596    ) -> Result<u32> {
3597        unsafe {
3598            instance.start_call(
3599                StoreContextMut(self),
3600                callback,
3601                post_return,
3602                callee,
3603                param_count,
3604                result_count,
3605                flags,
3606                None,
3607            )
3608        }
3609    }
3610
3611    fn future_write(
3612        &mut self,
3613        instance: Instance,
3614        caller: RuntimeComponentInstanceIndex,
3615        ty: TypeFutureTableIndex,
3616        options: OptionsIndex,
3617        future: u32,
3618        address: u32,
3619    ) -> Result<u32> {
3620        instance.id().get(self).check_may_leave(caller)?;
3621        instance
3622            .guest_write(
3623                StoreContextMut(self),
3624                TransmitIndex::Future(ty),
3625                options,
3626                None,
3627                future,
3628                address,
3629                1,
3630            )
3631            .map(|result| result.encode())
3632    }
3633
3634    fn future_read(
3635        &mut self,
3636        instance: Instance,
3637        caller: RuntimeComponentInstanceIndex,
3638        ty: TypeFutureTableIndex,
3639        options: OptionsIndex,
3640        future: u32,
3641        address: u32,
3642    ) -> Result<u32> {
3643        instance.id().get(self).check_may_leave(caller)?;
3644        instance
3645            .guest_read(
3646                StoreContextMut(self),
3647                TransmitIndex::Future(ty),
3648                options,
3649                None,
3650                future,
3651                address,
3652                1,
3653            )
3654            .map(|result| result.encode())
3655    }
3656
3657    fn stream_write(
3658        &mut self,
3659        instance: Instance,
3660        caller: RuntimeComponentInstanceIndex,
3661        ty: TypeStreamTableIndex,
3662        options: OptionsIndex,
3663        stream: u32,
3664        address: u32,
3665        count: u32,
3666    ) -> Result<u32> {
3667        instance.id().get(self).check_may_leave(caller)?;
3668        instance
3669            .guest_write(
3670                StoreContextMut(self),
3671                TransmitIndex::Stream(ty),
3672                options,
3673                None,
3674                stream,
3675                address,
3676                count,
3677            )
3678            .map(|result| result.encode())
3679    }
3680
3681    fn stream_read(
3682        &mut self,
3683        instance: Instance,
3684        caller: RuntimeComponentInstanceIndex,
3685        ty: TypeStreamTableIndex,
3686        options: OptionsIndex,
3687        stream: u32,
3688        address: u32,
3689        count: u32,
3690    ) -> Result<u32> {
3691        instance.id().get(self).check_may_leave(caller)?;
3692        instance
3693            .guest_read(
3694                StoreContextMut(self),
3695                TransmitIndex::Stream(ty),
3696                options,
3697                None,
3698                stream,
3699                address,
3700                count,
3701            )
3702            .map(|result| result.encode())
3703    }
3704
3705    fn future_drop_writable(
3706        &mut self,
3707        instance: Instance,
3708        caller: RuntimeComponentInstanceIndex,
3709        ty: TypeFutureTableIndex,
3710        writer: u32,
3711    ) -> Result<()> {
3712        instance.id().get(self).check_may_leave(caller)?;
3713        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3714    }
3715
3716    fn flat_stream_write(
3717        &mut self,
3718        instance: Instance,
3719        caller: RuntimeComponentInstanceIndex,
3720        ty: TypeStreamTableIndex,
3721        options: OptionsIndex,
3722        payload_size: u32,
3723        payload_align: u32,
3724        stream: u32,
3725        address: u32,
3726        count: u32,
3727    ) -> Result<u32> {
3728        instance.id().get(self).check_may_leave(caller)?;
3729        instance
3730            .guest_write(
3731                StoreContextMut(self),
3732                TransmitIndex::Stream(ty),
3733                options,
3734                Some(FlatAbi {
3735                    size: payload_size,
3736                    align: payload_align,
3737                }),
3738                stream,
3739                address,
3740                count,
3741            )
3742            .map(|result| result.encode())
3743    }
3744
3745    fn flat_stream_read(
3746        &mut self,
3747        instance: Instance,
3748        caller: RuntimeComponentInstanceIndex,
3749        ty: TypeStreamTableIndex,
3750        options: OptionsIndex,
3751        payload_size: u32,
3752        payload_align: u32,
3753        stream: u32,
3754        address: u32,
3755        count: u32,
3756    ) -> Result<u32> {
3757        instance.id().get(self).check_may_leave(caller)?;
3758        instance
3759            .guest_read(
3760                StoreContextMut(self),
3761                TransmitIndex::Stream(ty),
3762                options,
3763                Some(FlatAbi {
3764                    size: payload_size,
3765                    align: payload_align,
3766                }),
3767                stream,
3768                address,
3769                count,
3770            )
3771            .map(|result| result.encode())
3772    }
3773
3774    fn stream_drop_writable(
3775        &mut self,
3776        instance: Instance,
3777        caller: RuntimeComponentInstanceIndex,
3778        ty: TypeStreamTableIndex,
3779        writer: u32,
3780    ) -> Result<()> {
3781        instance.id().get(self).check_may_leave(caller)?;
3782        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3783    }
3784
3785    fn error_context_debug_message(
3786        &mut self,
3787        instance: Instance,
3788        caller: RuntimeComponentInstanceIndex,
3789        ty: TypeComponentLocalErrorContextTableIndex,
3790        options: OptionsIndex,
3791        err_ctx_handle: u32,
3792        debug_msg_address: u32,
3793    ) -> Result<()> {
3794        instance.id().get(self).check_may_leave(caller)?;
3795        instance.error_context_debug_message(
3796            StoreContextMut(self),
3797            ty,
3798            options,
3799            err_ctx_handle,
3800            debug_msg_address,
3801        )
3802    }
3803
3804    fn thread_new_indirect(
3805        &mut self,
3806        instance: Instance,
3807        caller: RuntimeComponentInstanceIndex,
3808        func_ty_idx: TypeFuncIndex,
3809        start_func_table_idx: RuntimeTableIndex,
3810        start_func_idx: u32,
3811        context: i32,
3812    ) -> Result<u32> {
3813        instance.thread_new_indirect(
3814            StoreContextMut(self),
3815            caller,
3816            func_ty_idx,
3817            start_func_table_idx,
3818            start_func_idx,
3819            context,
3820        )
3821    }
3822}
3823
3824type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3825
3826/// Represents the state of a pending host task.
3827struct HostTask {
3828    common: WaitableCommon,
3829    caller_instance: RuntimeComponentInstanceIndex,
3830    join_handle: Option<JoinHandle>,
3831}
3832
3833impl HostTask {
3834    fn new(
3835        caller_instance: RuntimeComponentInstanceIndex,
3836        join_handle: Option<JoinHandle>,
3837    ) -> Self {
3838        Self {
3839            common: WaitableCommon::default(),
3840            caller_instance,
3841            join_handle,
3842        }
3843    }
3844}
3845
3846impl TableDebug for HostTask {
3847    fn type_name() -> &'static str {
3848        "HostTask"
3849    }
3850}
3851
3852type CallbackFn = Box<
3853    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3854        + Send
3855        + Sync
3856        + 'static,
3857>;
3858
3859/// Represents the caller of a given guest task.
3860enum Caller {
3861    /// The host called the guest task.
3862    Host {
3863        /// If present, may be used to deliver the result.
3864        tx: Option<oneshot::Sender<LiftedResult>>,
3865        /// Channel to notify once all subtasks spawned by this caller have
3866        /// completed.
3867        ///
3868        /// Note that we'll never actually send anything to this channel;
3869        /// dropping it when the refcount goes to zero is sufficient to notify
3870        /// the receiver.
3871        exit_tx: Arc<oneshot::Sender<()>>,
3872        /// If true, there's a host future that must be dropped before the task
3873        /// can be deleted.
3874        host_future_present: bool,
3875        /// If true, call `post-return` function (if any) automatically.
3876        call_post_return_automatically: bool,
3877    },
3878    /// Another guest thread called the guest task
3879    Guest {
3880        /// The id of the caller
3881        thread: QualifiedThreadId,
3882        /// The instance to use to enforce reentrance rules.
3883        ///
3884        /// Note that this might not be the same as the instance the caller task
3885        /// started executing in given that one or more synchronous guest->guest
3886        /// calls may have occurred involving multiple instances.
3887        instance: RuntimeComponentInstanceIndex,
3888    },
3889}
3890
3891/// Represents a closure and related canonical ABI parameters required to
3892/// validate a `task.return` call at runtime and lift the result.
3893struct LiftResult {
3894    lift: RawLift,
3895    ty: TypeTupleIndex,
3896    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3897    string_encoding: StringEncoding,
3898}
3899
3900/// The table ID for a guest thread, qualified by the task to which it belongs.
3901///
3902/// This exists to minimize table lookups and the necessity to pass stores around mutably
3903/// for the common case of identifying the task to which a thread belongs.
3904#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3905struct QualifiedThreadId {
3906    task: TableId<GuestTask>,
3907    thread: TableId<GuestThread>,
3908}
3909
3910impl QualifiedThreadId {
3911    fn qualify(
3912        state: &mut ConcurrentState,
3913        thread: TableId<GuestThread>,
3914    ) -> Result<QualifiedThreadId> {
3915        Ok(QualifiedThreadId {
3916            task: state.get_mut(thread)?.parent_task,
3917            thread,
3918        })
3919    }
3920}
3921
3922impl fmt::Debug for QualifiedThreadId {
3923    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3924        f.debug_tuple("QualifiedThreadId")
3925            .field(&self.task.rep())
3926            .field(&self.thread.rep())
3927            .finish()
3928    }
3929}
3930
3931enum GuestThreadState {
3932    NotStartedImplicit,
3933    NotStartedExplicit(
3934        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
3935    ),
3936    Running,
3937    Suspended(StoreFiber<'static>),
3938    Pending,
3939    Completed,
3940}
3941pub struct GuestThread {
3942    /// Context-local state used to implement the `context.{get,set}`
3943    /// intrinsics.
3944    context: [u32; 2],
3945    /// The owning guest task.
3946    parent_task: TableId<GuestTask>,
3947    /// If present, indicates that the thread is currently waiting on the
3948    /// specified set but may be cancelled and woken immediately.
3949    wake_on_cancel: Option<TableId<WaitableSet>>,
3950    /// The execution state of this guest thread
3951    state: GuestThreadState,
3952    /// The index of this thread in the component instance's handle table.
3953    /// This must always be `Some` after initialization.
3954    instance_rep: Option<u32>,
3955}
3956
3957impl GuestThread {
3958    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
3959    /// handle.
3960    fn from_instance(
3961        state: Pin<&mut ComponentInstance>,
3962        caller_instance: RuntimeComponentInstanceIndex,
3963        guest_thread: u32,
3964    ) -> Result<TableId<Self>> {
3965        let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
3966        Ok(TableId::new(rep))
3967    }
3968
3969    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
3970        Self {
3971            context: [0; 2],
3972            parent_task,
3973            wake_on_cancel: None,
3974            state: GuestThreadState::NotStartedImplicit,
3975            instance_rep: None,
3976        }
3977    }
3978
3979    fn new_explicit(
3980        parent_task: TableId<GuestTask>,
3981        start_func: Box<
3982            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
3983        >,
3984    ) -> Self {
3985        Self {
3986            context: [0; 2],
3987            parent_task,
3988            wake_on_cancel: None,
3989            state: GuestThreadState::NotStartedExplicit(start_func),
3990            instance_rep: None,
3991        }
3992    }
3993}
3994
3995impl TableDebug for GuestThread {
3996    fn type_name() -> &'static str {
3997        "GuestThread"
3998    }
3999}
4000
4001enum SyncResult {
4002    NotProduced,
4003    Produced(Option<ValRaw>),
4004    Taken,
4005}
4006
4007impl SyncResult {
4008    fn take(&mut self) -> Option<Option<ValRaw>> {
4009        match mem::replace(self, SyncResult::Taken) {
4010            SyncResult::NotProduced => None,
4011            SyncResult::Produced(val) => Some(val),
4012            SyncResult::Taken => {
4013                panic!("attempted to take a synchronous result that was already taken")
4014            }
4015        }
4016    }
4017}
4018
4019#[derive(Debug)]
4020enum HostFutureState {
4021    NotApplicable,
4022    Live,
4023    Dropped,
4024}
4025
4026/// Represents a pending guest task.
4027pub(crate) struct GuestTask {
4028    /// See `WaitableCommon`
4029    common: WaitableCommon,
4030    /// Closure to lower the parameters passed to this task.
4031    lower_params: Option<RawLower>,
4032    /// See `LiftResult`
4033    lift_result: Option<LiftResult>,
4034    /// A place to stash the type-erased lifted result if it can't be delivered
4035    /// immediately.
4036    result: Option<LiftedResult>,
4037    /// Closure to call the callback function for an async-lifted export, if
4038    /// provided.
4039    callback: Option<CallbackFn>,
4040    /// See `Caller`
4041    caller: Caller,
4042    /// A place to stash the call context for managing resource borrows while
4043    /// switching between guest tasks.
4044    call_context: Option<CallContext>,
4045    /// A place to stash the lowered result for a sync-to-async call until it
4046    /// can be returned to the caller.
4047    sync_result: SyncResult,
4048    /// Whether or not the task has been cancelled (i.e. whether the task is
4049    /// permitted to call `task.cancel`).
4050    cancel_sent: bool,
4051    /// Whether or not we've sent a `Status::Starting` event to any current or
4052    /// future waiters for this waitable.
4053    starting_sent: bool,
4054    /// Pending guest subtasks created by this task (directly or indirectly).
4055    ///
4056    /// This is used to re-parent subtasks which are still running when their
4057    /// parent task is disposed.
4058    subtasks: HashSet<TableId<GuestTask>>,
4059    /// Scratch waitable set used to watch subtasks during synchronous calls.
4060    sync_call_set: TableId<WaitableSet>,
4061    /// The instance to which the exported function for this guest task belongs.
4062    ///
4063    /// Note that the task may do a sync->sync call via a fused adapter which
4064    /// results in that task executing code in a different instance, and it may
4065    /// call host functions and intrinsics from that other instance.
4066    instance: RuntimeComponentInstanceIndex,
4067    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4068    /// delivered to this task.
4069    event: Option<Event>,
4070    /// The `ExportIndex` of the guest function being called, if known.
4071    function_index: Option<ExportIndex>,
4072    /// Whether or not the task has exited.
4073    exited: bool,
4074    /// Threads belonging to this task
4075    threads: HashSet<TableId<GuestThread>>,
4076    /// The state of the host future that represents an async task, which must
4077    /// be dropped before we can delete the task.
4078    host_future_state: HostFutureState,
4079}
4080
4081impl GuestTask {
4082    fn already_lowered_parameters(&self) -> bool {
4083        // We reset `lower_params` after we lower the parameters
4084        self.lower_params.is_none()
4085    }
4086    fn returned_or_cancelled(&self) -> bool {
4087        // We reset `lift_result` after we return or exit
4088        self.lift_result.is_none()
4089    }
4090    fn ready_to_delete(&self) -> bool {
4091        let threads_completed = self.threads.is_empty();
4092        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4093        let pending_completion_event = matches!(
4094            self.common.event,
4095            Some(Event::Subtask {
4096                status: Status::Returned | Status::ReturnCancelled
4097            })
4098        );
4099        let ready = threads_completed
4100            && !has_sync_result
4101            && !pending_completion_event
4102            && !matches!(self.host_future_state, HostFutureState::Live);
4103        log::trace!(
4104            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4105            threads_completed,
4106            has_sync_result,
4107            pending_completion_event,
4108            self.host_future_state
4109        );
4110        ready
4111    }
4112    fn new(
4113        state: &mut ConcurrentState,
4114        lower_params: RawLower,
4115        lift_result: LiftResult,
4116        caller: Caller,
4117        callback: Option<CallbackFn>,
4118        component_instance: RuntimeComponentInstanceIndex,
4119    ) -> Result<Self> {
4120        let sync_call_set = state.push(WaitableSet::default())?;
4121        let host_future_state = match &caller {
4122            Caller::Guest { .. } => HostFutureState::NotApplicable,
4123            Caller::Host {
4124                host_future_present,
4125                ..
4126            } => {
4127                if *host_future_present {
4128                    HostFutureState::Live
4129                } else {
4130                    HostFutureState::NotApplicable
4131                }
4132            }
4133        };
4134        Ok(Self {
4135            common: WaitableCommon::default(),
4136            lower_params: Some(lower_params),
4137            lift_result: Some(lift_result),
4138            result: None,
4139            callback,
4140            caller,
4141            call_context: Some(CallContext::default()),
4142            sync_result: SyncResult::NotProduced,
4143            cancel_sent: false,
4144            starting_sent: false,
4145            subtasks: HashSet::new(),
4146            sync_call_set,
4147            instance: component_instance,
4148            event: None,
4149            function_index: None,
4150            exited: false,
4151            threads: HashSet::new(),
4152            host_future_state,
4153        })
4154    }
4155
4156    /// Dispose of this guest task, reparenting any pending subtasks to the
4157    /// caller.
4158    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4159        // If there are not-yet-delivered completion events for subtasks in
4160        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4161        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4162            if let Some(Event::Subtask {
4163                status: Status::Returned | Status::ReturnCancelled,
4164            }) = waitable.common(state)?.event
4165            {
4166                waitable.delete_from(state)?;
4167            }
4168        }
4169
4170        assert!(self.threads.is_empty());
4171
4172        state.delete(self.sync_call_set)?;
4173
4174        // Reparent any pending subtasks to the caller.
4175        match &self.caller {
4176            Caller::Guest {
4177                thread,
4178                instance: runtime_instance,
4179            } => {
4180                let task_mut = state.get_mut(thread.task)?;
4181                let present = task_mut.subtasks.remove(&me);
4182                assert!(present);
4183
4184                for subtask in &self.subtasks {
4185                    task_mut.subtasks.insert(*subtask);
4186                }
4187
4188                for subtask in &self.subtasks {
4189                    state.get_mut(*subtask)?.caller = Caller::Guest {
4190                        thread: *thread,
4191                        instance: *runtime_instance,
4192                    };
4193                }
4194            }
4195            Caller::Host { exit_tx, .. } => {
4196                for subtask in &self.subtasks {
4197                    state.get_mut(*subtask)?.caller = Caller::Host {
4198                        tx: None,
4199                        // Clone `exit_tx` to ensure that it is only dropped
4200                        // once all transitive subtasks of the host call have
4201                        // exited:
4202                        exit_tx: exit_tx.clone(),
4203                        host_future_present: false,
4204                        call_post_return_automatically: true,
4205                    };
4206                }
4207            }
4208        }
4209
4210        for subtask in self.subtasks {
4211            if state.get_mut(subtask)?.exited {
4212                Waitable::Guest(subtask).delete_from(state)?;
4213            }
4214        }
4215
4216        Ok(())
4217    }
4218
4219    fn call_post_return_automatically(&self) -> bool {
4220        matches!(
4221            self.caller,
4222            Caller::Guest { .. }
4223                | Caller::Host {
4224                    call_post_return_automatically: true,
4225                    ..
4226                }
4227        )
4228    }
4229}
4230
4231impl TableDebug for GuestTask {
4232    fn type_name() -> &'static str {
4233        "GuestTask"
4234    }
4235}
4236
4237/// Represents state common to all kinds of waitables.
4238#[derive(Default)]
4239struct WaitableCommon {
4240    /// The currently pending event for this waitable, if any.
4241    event: Option<Event>,
4242    /// The set to which this waitable belongs, if any.
4243    set: Option<TableId<WaitableSet>>,
4244    /// The handle with which the guest refers to this waitable, if any.
4245    handle: Option<u32>,
4246}
4247
4248/// Represents a Component Model Async `waitable`.
4249#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4250enum Waitable {
4251    /// A host task
4252    Host(TableId<HostTask>),
4253    /// A guest task
4254    Guest(TableId<GuestTask>),
4255    /// The read or write end of a stream or future
4256    Transmit(TableId<TransmitHandle>),
4257}
4258
4259impl Waitable {
4260    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4261    /// handle.
4262    fn from_instance(
4263        state: Pin<&mut ComponentInstance>,
4264        caller_instance: RuntimeComponentInstanceIndex,
4265        waitable: u32,
4266    ) -> Result<Self> {
4267        use crate::runtime::vm::component::Waitable;
4268
4269        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4270
4271        Ok(match kind {
4272            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4273            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4274            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4275        })
4276    }
4277
4278    /// Retrieve the host-visible identifier for this `Waitable`.
4279    fn rep(&self) -> u32 {
4280        match self {
4281            Self::Host(id) => id.rep(),
4282            Self::Guest(id) => id.rep(),
4283            Self::Transmit(id) => id.rep(),
4284        }
4285    }
4286
4287    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4288    /// remove it from any set it may currently belong to (when `set` is
4289    /// `None`).
4290    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4291        log::trace!("waitable {self:?} join set {set:?}",);
4292
4293        let old = mem::replace(&mut self.common(state)?.set, set);
4294
4295        if let Some(old) = old {
4296            match *self {
4297                Waitable::Host(id) => state.remove_child(id, old),
4298                Waitable::Guest(id) => state.remove_child(id, old),
4299                Waitable::Transmit(id) => state.remove_child(id, old),
4300            }?;
4301
4302            state.get_mut(old)?.ready.remove(self);
4303        }
4304
4305        if let Some(set) = set {
4306            match *self {
4307                Waitable::Host(id) => state.add_child(id, set),
4308                Waitable::Guest(id) => state.add_child(id, set),
4309                Waitable::Transmit(id) => state.add_child(id, set),
4310            }?;
4311
4312            if self.common(state)?.event.is_some() {
4313                self.mark_ready(state)?;
4314            }
4315        }
4316
4317        Ok(())
4318    }
4319
4320    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4321    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4322        Ok(match self {
4323            Self::Host(id) => &mut state.get_mut(*id)?.common,
4324            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4325            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4326        })
4327    }
4328
4329    /// Set or clear the pending event for this waitable and either deliver it
4330    /// to the first waiter, if any, or mark it as ready to be delivered to the
4331    /// next waiter that arrives.
4332    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4333        log::trace!("set event for {self:?}: {event:?}");
4334        self.common(state)?.event = event;
4335        self.mark_ready(state)
4336    }
4337
4338    /// Take the pending event from this waitable, leaving `None` in its place.
4339    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4340        let common = self.common(state)?;
4341        let event = common.event.take();
4342        if let Some(set) = self.common(state)?.set {
4343            state.get_mut(set)?.ready.remove(self);
4344        }
4345
4346        Ok(event)
4347    }
4348
4349    /// Deliver the current event for this waitable to the first waiter, if any,
4350    /// or else mark it as ready to be delivered to the next waiter that
4351    /// arrives.
4352    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4353        if let Some(set) = self.common(state)?.set {
4354            state.get_mut(set)?.ready.insert(*self);
4355            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4356                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4357                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4358
4359                let item = match mode {
4360                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4361                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4362                        thread,
4363                        kind: GuestCallKind::DeliverEvent {
4364                            instance,
4365                            set: Some(set),
4366                        },
4367                    }),
4368                };
4369                state.push_high_priority(item);
4370            }
4371        }
4372        Ok(())
4373    }
4374
4375    /// Handle the imminent delivery of the specified event, e.g. by updating
4376    /// the state of the stream or future.
4377    fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
4378        match event {
4379            Event::FutureRead {
4380                pending: Some((ty, handle)),
4381                ..
4382            }
4383            | Event::FutureWrite {
4384                pending: Some((ty, handle)),
4385                ..
4386            } => {
4387                let runtime_instance = instance.component().types()[ty].instance;
4388                let (rep, state) = instance.guest_tables().0[runtime_instance]
4389                    .future_rep(ty, handle)
4390                    .unwrap();
4391                assert_eq!(rep, self.rep());
4392                assert_eq!(*state, TransmitLocalState::Busy);
4393                *state = match event {
4394                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4395                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4396                    _ => unreachable!(),
4397                };
4398            }
4399            Event::StreamRead {
4400                pending: Some((ty, handle)),
4401                code,
4402            }
4403            | Event::StreamWrite {
4404                pending: Some((ty, handle)),
4405                code,
4406            } => {
4407                let runtime_instance = instance.component().types()[ty].instance;
4408                let (rep, state) = instance.guest_tables().0[runtime_instance]
4409                    .stream_rep(ty, handle)
4410                    .unwrap();
4411                assert_eq!(rep, self.rep());
4412                assert_eq!(*state, TransmitLocalState::Busy);
4413                let done = matches!(code, ReturnCode::Dropped(_));
4414                *state = match event {
4415                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
4416                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4417                    _ => unreachable!(),
4418                };
4419            }
4420            _ => {}
4421        }
4422    }
4423
4424    /// Remove this waitable from the instance's rep table.
4425    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4426        match self {
4427            Self::Host(task) => {
4428                log::trace!("delete host task {task:?}");
4429                state.delete(*task)?;
4430            }
4431            Self::Guest(task) => {
4432                log::trace!("delete guest task {task:?}");
4433                state.delete(*task)?.dispose(state, *task)?;
4434            }
4435            Self::Transmit(task) => {
4436                state.delete(*task)?;
4437            }
4438        }
4439
4440        Ok(())
4441    }
4442}
4443
4444impl fmt::Debug for Waitable {
4445    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4446        match self {
4447            Self::Host(id) => write!(f, "{id:?}"),
4448            Self::Guest(id) => write!(f, "{id:?}"),
4449            Self::Transmit(id) => write!(f, "{id:?}"),
4450        }
4451    }
4452}
4453
4454/// Represents a Component Model Async `waitable-set`.
4455#[derive(Default)]
4456struct WaitableSet {
4457    /// Which waitables in this set have pending events, if any.
4458    ready: BTreeSet<Waitable>,
4459    /// Which guest threads are currently waiting on this set, if any.
4460    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4461}
4462
4463impl TableDebug for WaitableSet {
4464    fn type_name() -> &'static str {
4465        "WaitableSet"
4466    }
4467}
4468
4469/// Type-erased closure to lower the parameters for a guest task.
4470type RawLower =
4471    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4472
4473/// Type-erased closure to lift the result for a guest task.
4474type RawLift = Box<
4475    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4476>;
4477
4478/// Type erased result of a guest task which may be downcast to the expected
4479/// type by a host caller (or simply ignored in the case of a guest caller; see
4480/// `DummyResult`).
4481type LiftedResult = Box<dyn Any + Send + Sync>;
4482
4483/// Used to return a result from a `LiftFn` when the actual result has already
4484/// been lowered to a guest task's stack and linear memory.
4485struct DummyResult;
4486
4487/// Represents the Component Model Async state of a (sub-)component instance.
4488#[derive(Default)]
4489struct InstanceState {
4490    /// Whether backpressure is set for this instance (enabled if >0)
4491    backpressure: u16,
4492    /// Whether this instance can be entered
4493    do_not_enter: bool,
4494    /// Pending calls for this instance which require `Self::backpressure` to be
4495    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4496    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4497}
4498
4499/// Represents the Component Model Async state of a store.
4500pub struct ConcurrentState {
4501    /// The currently running guest thread, if any.
4502    guest_thread: Option<QualifiedThreadId>,
4503
4504    /// The set of pending host and background tasks, if any.
4505    ///
4506    /// See `ComponentInstance::poll_until` for where we temporarily take this
4507    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4508    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4509    /// The table of waitables, waitable sets, etc.
4510    table: AlwaysMut<ResourceTable>,
4511    /// Per (sub-)component instance states.
4512    ///
4513    /// See `InstanceState` for details and note that this map is lazily
4514    /// populated as needed.
4515    // TODO: this can and should be a `PrimaryMap`
4516    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4517    /// The "high priority" work queue for this store's event loop.
4518    high_priority: Vec<WorkItem>,
4519    /// The "low priority" work queue for this store's event loop.
4520    low_priority: Vec<WorkItem>,
4521    /// A place to stash the reason a fiber is suspending so that the code which
4522    /// resumed it will know under what conditions the fiber should be resumed
4523    /// again.
4524    suspend_reason: Option<SuspendReason>,
4525    /// A cached fiber which is waiting for work to do.
4526    ///
4527    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4528    worker: Option<StoreFiber<'static>>,
4529    /// A place to stash the work item for which we're resuming a worker fiber.
4530    worker_item: Option<WorkerItem>,
4531
4532    /// Reference counts for all component error contexts
4533    ///
4534    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4535    /// (sub)component ref counts as tracked by `error_context_tables`, for
4536    /// example when the host holds one or more references to error contexts.
4537    ///
4538    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4539    /// component-wide representation) of the index into concurrent state for a given
4540    /// stored `ErrorContext`.
4541    ///
4542    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4543    /// as a `TableId<ErrorContextState>`.
4544    global_error_context_ref_counts:
4545        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4546}
4547
4548impl Default for ConcurrentState {
4549    fn default() -> Self {
4550        Self {
4551            guest_thread: None,
4552            table: AlwaysMut::new(ResourceTable::new()),
4553            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4554            instance_states: HashMap::new(),
4555            high_priority: Vec::new(),
4556            low_priority: Vec::new(),
4557            suspend_reason: None,
4558            worker: None,
4559            worker_item: None,
4560            global_error_context_ref_counts: BTreeMap::new(),
4561        }
4562    }
4563}
4564
4565impl ConcurrentState {
4566    /// Take ownership of any fibers and futures owned by this object.
4567    ///
4568    /// This should be used when disposing of the `Store` containing this object
4569    /// in order to gracefully resolve any and all fibers using
4570    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4571    /// use-after-free bugs due to fibers which may still have access to the
4572    /// `Store`.
4573    ///
4574    /// Additionally, the futures collected with this function should be dropped
4575    /// within a `tls::set` call, which will ensure than any futures closing
4576    /// over an `&Accessor` will have access to the store when dropped, allowing
4577    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4578    /// panicking.
4579    ///
4580    /// Note that this will leave the object in an inconsistent and unusable
4581    /// state, so it should only be used just prior to dropping it.
4582    pub(crate) fn take_fibers_and_futures(
4583        &mut self,
4584        fibers: &mut Vec<StoreFiber<'static>>,
4585        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4586    ) {
4587        for entry in self.table.get_mut().iter_mut() {
4588            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4589                for mode in mem::take(&mut set.waiting).into_values() {
4590                    if let WaitMode::Fiber(fiber) = mode {
4591                        fibers.push(fiber);
4592                    }
4593                }
4594            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4595                if let GuestThreadState::Suspended(fiber) =
4596                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4597                {
4598                    fibers.push(fiber);
4599                }
4600            }
4601        }
4602
4603        if let Some(fiber) = self.worker.take() {
4604            fibers.push(fiber);
4605        }
4606
4607        let mut take_items = |list| {
4608            for item in mem::take(list) {
4609                match item {
4610                    WorkItem::ResumeFiber(fiber) => {
4611                        fibers.push(fiber);
4612                    }
4613                    WorkItem::PushFuture(future) => {
4614                        self.futures
4615                            .get_mut()
4616                            .as_mut()
4617                            .unwrap()
4618                            .push(future.into_inner());
4619                    }
4620                    _ => {}
4621                }
4622            }
4623        };
4624
4625        take_items(&mut self.high_priority);
4626        take_items(&mut self.low_priority);
4627
4628        if let Some(them) = self.futures.get_mut().take() {
4629            futures.push(them);
4630        }
4631    }
4632
4633    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4634        self.instance_states.entry(instance).or_default()
4635    }
4636
4637    fn push<V: Send + Sync + 'static>(
4638        &mut self,
4639        value: V,
4640    ) -> Result<TableId<V>, ResourceTableError> {
4641        self.table.get_mut().push(value).map(TableId::from)
4642    }
4643
4644    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4645        self.table.get_mut().get_mut(&Resource::from(id))
4646    }
4647
4648    pub fn add_child<T: 'static, U: 'static>(
4649        &mut self,
4650        child: TableId<T>,
4651        parent: TableId<U>,
4652    ) -> Result<(), ResourceTableError> {
4653        self.table
4654            .get_mut()
4655            .add_child(Resource::from(child), Resource::from(parent))
4656    }
4657
4658    pub fn remove_child<T: 'static, U: 'static>(
4659        &mut self,
4660        child: TableId<T>,
4661        parent: TableId<U>,
4662    ) -> Result<(), ResourceTableError> {
4663        self.table
4664            .get_mut()
4665            .remove_child(Resource::from(child), Resource::from(parent))
4666    }
4667
4668    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4669        self.table.get_mut().delete(Resource::from(id))
4670    }
4671
4672    fn push_future(&mut self, future: HostTaskFuture) {
4673        // Note that we can't directly push to `ConcurrentState::futures` here
4674        // since this may be called from a future that's being polled inside
4675        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4676        // so it has exclusive access while polling it.  Therefore, we push a
4677        // work item to the "high priority" queue, which will actually push to
4678        // `ConcurrentState::futures` later.
4679        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4680    }
4681
4682    fn push_high_priority(&mut self, item: WorkItem) {
4683        log::trace!("push high priority: {item:?}");
4684        self.high_priority.push(item);
4685    }
4686
4687    fn push_low_priority(&mut self, item: WorkItem) {
4688        log::trace!("push low priority: {item:?}");
4689        self.low_priority.push(item);
4690    }
4691
4692    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4693        if high_priority {
4694            self.push_high_priority(item);
4695        } else {
4696            self.push_low_priority(item);
4697        }
4698    }
4699
4700    /// Determine whether the instance associated with the specified guest task
4701    /// may be entered (i.e. is not already on the async call stack).
4702    ///
4703    /// This is an additional check on top of the "may_enter" instance flag;
4704    /// it's needed because async-lifted exports with callback functions must
4705    /// not call their own instances directly or indirectly, and due to the
4706    /// "stackless" nature of callback-enabled guest tasks this may happen even
4707    /// if there are no activation records on the stack (i.e. the "may_enter"
4708    /// field is `true`) for that instance.
4709    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4710        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4711
4712        // Walk the task tree back to the root, looking for potential
4713        // reentrance.
4714        //
4715        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4716        // such that each bit represents and instance which has been entered by
4717        // that task or an ancestor of that task, in which case this would be a
4718        // constant time check.
4719        loop {
4720            let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4721                Caller::Host { .. } => break true,
4722                Caller::Guest { thread, instance } => {
4723                    if *instance == guest_instance {
4724                        break false;
4725                    } else {
4726                        *thread
4727                    }
4728                }
4729            };
4730            guest_task = next_thread.task;
4731        }
4732    }
4733
4734    /// Record that we're about to enter a (sub-)component instance which does
4735    /// not support more than one concurrent, stackful activation, meaning it
4736    /// cannot be entered again until the next call returns.
4737    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4738        self.instance_state(instance).do_not_enter = true;
4739    }
4740
4741    /// Record that we've exited a (sub-)component instance previously entered
4742    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4743    /// See the documentation for the latter for details.
4744    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4745        self.instance_state(instance).do_not_enter = false;
4746        self.partition_pending(instance)
4747    }
4748
4749    /// Iterate over `InstanceState::pending`, moving any ready items into the
4750    /// "high priority" work item queue.
4751    ///
4752    /// See `GuestCall::is_ready` for details.
4753    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4754        for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4755            let call = GuestCall { thread, kind };
4756            if call.is_ready(self)? {
4757                self.push_high_priority(WorkItem::GuestCall(call));
4758            } else {
4759                self.instance_state(instance)
4760                    .pending
4761                    .insert(call.thread, call.kind);
4762            }
4763        }
4764
4765        Ok(())
4766    }
4767
4768    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4769    pub(crate) fn backpressure_modify(
4770        &mut self,
4771        caller_instance: RuntimeComponentInstanceIndex,
4772        modify: impl FnOnce(u16) -> Option<u16>,
4773    ) -> Result<()> {
4774        let state = self.instance_state(caller_instance);
4775        let old = state.backpressure;
4776        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4777        state.backpressure = new;
4778
4779        if old > 0 && new == 0 {
4780            // Backpressure was previously enabled and is now disabled; move any
4781            // newly-eligible guest calls to the "high priority" queue.
4782            self.partition_pending(caller_instance)?;
4783        }
4784
4785        Ok(())
4786    }
4787
4788    /// Implements the `context.get` intrinsic.
4789    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4790        let thread = self.guest_thread.unwrap();
4791        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4792        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4793        Ok(val)
4794    }
4795
4796    /// Implements the `context.set` intrinsic.
4797    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4798        let thread = self.guest_thread.unwrap();
4799        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4800        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4801        Ok(())
4802    }
4803
4804    /// Returns whether there's a pending cancellation on the current guest thread,
4805    /// consuming the event if so.
4806    fn take_pending_cancellation(&mut self) -> bool {
4807        let thread = self.guest_thread.unwrap();
4808        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4809            assert!(matches!(event, Event::Cancelled));
4810            true
4811        } else {
4812            false
4813        }
4814    }
4815}
4816
4817/// Provide a type hint to compiler about the shape of a parameter lower
4818/// closure.
4819fn for_any_lower<
4820    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4821>(
4822    fun: F,
4823) -> F {
4824    fun
4825}
4826
4827/// Provide a type hint to compiler about the shape of a result lift closure.
4828fn for_any_lift<
4829    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4830>(
4831    fun: F,
4832) -> F {
4833    fun
4834}
4835
4836/// Wrap the specified future in a `poll_fn` which asserts that the future is
4837/// only polled from the event loop of the specified `Store`.
4838///
4839/// See `StoreContextMut::run_concurrent` for details.
4840fn checked<F: Future + Send + 'static>(
4841    id: StoreId,
4842    fut: F,
4843) -> impl Future<Output = F::Output> + Send + 'static {
4844    async move {
4845        let mut fut = pin!(fut);
4846        future::poll_fn(move |cx| {
4847            let message = "\
4848                `Future`s which depend on asynchronous component tasks, streams, or \
4849                futures to complete may only be polled from the event loop of the \
4850                store to which they belong.  Please use \
4851                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4852            ";
4853            tls::try_get(|store| {
4854                let matched = match store {
4855                    tls::TryGet::Some(store) => store.id() == id,
4856                    tls::TryGet::Taken | tls::TryGet::None => false,
4857                };
4858
4859                if !matched {
4860                    panic!("{message}")
4861                }
4862            });
4863            fut.as_mut().poll(cx)
4864        })
4865        .await
4866    }
4867}
4868
4869/// Assert that `StoreContextMut::run_concurrent` has not been called from
4870/// within an store's event loop.
4871fn check_recursive_run() {
4872    tls::try_get(|store| {
4873        if !matches!(store, tls::TryGet::None) {
4874            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4875        }
4876    });
4877}
4878
4879fn unpack_callback_code(code: u32) -> (u32, u32) {
4880    (code & 0xF, code >> 4)
4881}
4882
4883/// Helper struct for packaging parameters to be passed to
4884/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4885/// `waitable-set.poll`.
4886struct WaitableCheckParams {
4887    set: TableId<WaitableSet>,
4888    options: OptionsIndex,
4889    payload: u32,
4890}
4891
4892/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4893enum WaitableCheck {
4894    Wait(WaitableCheckParams),
4895    Poll(WaitableCheckParams),
4896}
4897
4898/// Represents a guest task called from the host, prepared using `prepare_call`.
4899pub(crate) struct PreparedCall<R> {
4900    /// The guest export to be called
4901    handle: Func,
4902    /// The guest thread created by `prepare_call`
4903    thread: QualifiedThreadId,
4904    /// The number of lowered core Wasm parameters to pass to the call.
4905    param_count: usize,
4906    /// The `oneshot::Receiver` to which the result of the call will be
4907    /// delivered when it is available.
4908    rx: oneshot::Receiver<LiftedResult>,
4909    /// The `oneshot::Receiver` which will resolve when the task -- and any
4910    /// transitive subtasks -- have all exited.
4911    exit_rx: oneshot::Receiver<()>,
4912    _phantom: PhantomData<R>,
4913}
4914
4915impl<R> PreparedCall<R> {
4916    /// Get a copy of the `TaskId` for this `PreparedCall`.
4917    pub(crate) fn task_id(&self) -> TaskId {
4918        TaskId {
4919            task: self.thread.task,
4920        }
4921    }
4922}
4923
4924/// Represents a task created by `prepare_call`.
4925pub(crate) struct TaskId {
4926    task: TableId<GuestTask>,
4927}
4928
4929impl TaskId {
4930    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
4931    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
4932    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
4933    /// and can be resumed by other tasks for this component, so we mark the future as dropped
4934    /// and delete the task when all threads are done.
4935    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4936        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
4937        if !task.already_lowered_parameters() {
4938            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4939        } else {
4940            task.host_future_state = HostFutureState::Dropped;
4941            if task.ready_to_delete() {
4942                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
4943            }
4944        }
4945        Ok(())
4946    }
4947}
4948
4949/// Prepare a call to the specified exported Wasm function, providing functions
4950/// for lowering the parameters and lifting the result.
4951///
4952/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4953/// loop, use `queue_call`.
4954pub(crate) fn prepare_call<T, R>(
4955    mut store: StoreContextMut<T>,
4956    handle: Func,
4957    param_count: usize,
4958    host_future_present: bool,
4959    call_post_return_automatically: bool,
4960    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4961    + Send
4962    + Sync
4963    + 'static,
4964    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4965    + Send
4966    + Sync
4967    + 'static,
4968) -> Result<PreparedCall<R>> {
4969    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4970
4971    let instance = handle.instance().id().get(store.0);
4972    let task_return_type = instance.component().types()[ty].results;
4973    let component_instance = raw_options.instance;
4974    let callback = options.callback();
4975    let memory = options.memory_raw().map(SendSyncPtr::new);
4976    let string_encoding = options.string_encoding();
4977    let token = StoreToken::new(store.as_context_mut());
4978    let state = store.0.concurrent_state_mut();
4979
4980    assert!(state.guest_thread.is_none());
4981
4982    let (tx, rx) = oneshot::channel();
4983    let (exit_tx, exit_rx) = oneshot::channel();
4984
4985    let mut task = GuestTask::new(
4986        state,
4987        Box::new(for_any_lower(move |store, params| {
4988            lower_params(handle, token.as_context_mut(store), params)
4989        })),
4990        LiftResult {
4991            lift: Box::new(for_any_lift(move |store, result| {
4992                lift_result(handle, store, result)
4993            })),
4994            ty: task_return_type,
4995            memory,
4996            string_encoding,
4997        },
4998        Caller::Host {
4999            tx: Some(tx),
5000            exit_tx: Arc::new(exit_tx),
5001            host_future_present,
5002            call_post_return_automatically,
5003        },
5004        callback.map(|callback| {
5005            let callback = SendSyncPtr::new(callback);
5006            let instance = handle.instance();
5007            Box::new(
5008                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5009                    let store = token.as_context_mut(store);
5010                    // SAFETY: Per the contract of `prepare_call`, the callback
5011                    // will remain valid at least as long is this task exists.
5012                    unsafe {
5013                        instance.call_callback(
5014                            store,
5015                            runtime_instance,
5016                            callback,
5017                            event,
5018                            handle,
5019                            call_post_return_automatically,
5020                        )
5021                    }
5022                },
5023            ) as CallbackFn
5024        }),
5025        component_instance,
5026    )?;
5027    task.function_index = Some(handle.index());
5028
5029    let task = state.push(task)?;
5030    let thread = state.push(GuestThread::new_implicit(task))?;
5031    state.get_mut(task)?.threads.insert(thread);
5032
5033    Ok(PreparedCall {
5034        handle,
5035        thread: QualifiedThreadId { task, thread },
5036        param_count,
5037        rx,
5038        exit_rx,
5039        _phantom: PhantomData,
5040    })
5041}
5042
5043/// Queue a call previously prepared using `prepare_call` to be run as part of
5044/// the associated `ComponentInstance`'s event loop.
5045///
5046/// The returned future will resolve to the result once it is available, but
5047/// must only be polled via the instance's event loop. See
5048/// `StoreContextMut::run_concurrent` for details.
5049pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5050    mut store: StoreContextMut<T>,
5051    prepared: PreparedCall<R>,
5052) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5053    let PreparedCall {
5054        handle,
5055        thread,
5056        param_count,
5057        rx,
5058        exit_rx,
5059        ..
5060    } = prepared;
5061
5062    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5063
5064    Ok(checked(
5065        store.0.id(),
5066        rx.map(move |result| {
5067            result
5068                .map(|v| (*v.downcast().unwrap(), exit_rx))
5069                .map_err(anyhow::Error::from)
5070        }),
5071    ))
5072}
5073
5074/// Queue a call previously prepared using `prepare_call` to be run as part of
5075/// the associated `ComponentInstance`'s event loop.
5076fn queue_call0<T: 'static>(
5077    store: StoreContextMut<T>,
5078    handle: Func,
5079    guest_thread: QualifiedThreadId,
5080    param_count: usize,
5081) -> Result<()> {
5082    let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
5083    let is_concurrent = raw_options.async_;
5084    let instance = handle.instance();
5085    let callee = handle.lifted_core_func(store.0);
5086    let callback = options.callback();
5087    let post_return = handle.post_return_core_func(store.0);
5088
5089    log::trace!("queueing call {guest_thread:?}");
5090
5091    let instance_flags = if callback.is_none() {
5092        None
5093    } else {
5094        Some(flags)
5095    };
5096
5097    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5098    // (with signatures appropriate for this call) and will remain valid as
5099    // long as this instance is valid.
5100    unsafe {
5101        instance.queue_call(
5102            store,
5103            guest_thread,
5104            SendSyncPtr::new(callee),
5105            param_count,
5106            1,
5107            instance_flags,
5108            is_concurrent,
5109            callback.map(SendSyncPtr::new),
5110            post_return.map(SendSyncPtr::new),
5111        )
5112    }
5113}