Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
69use crate::{
70    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119/// Constant defined in the Component Model spec to indicate that the async
120/// intrinsic (e.g. `future.write`) has not yet completed.
121const BLOCKED: u32 = 0xffff_ffff;
122
123/// Corresponds to `CallState` in the upstream spec.
124#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126    Starting = 0,
127    Started = 1,
128    Returned = 2,
129    StartCancelled = 3,
130    ReturnCancelled = 4,
131}
132
133impl Status {
134    /// Packs this status and the optional `waitable` provided into a 32-bit
135    /// result that the canonical ABI requires.
136    ///
137    /// The low 4 bits are reserved for the status while the upper 28 bits are
138    /// the waitable, if present.
139    pub fn pack(self, waitable: Option<u32>) -> u32 {
140        assert!(matches!(self, Status::Returned) == waitable.is_none());
141        let waitable = waitable.unwrap_or(0);
142        assert!(waitable < (1 << 28));
143        (waitable << 4) | (self as u32)
144    }
145}
146
147/// Corresponds to `EventCode` in the Component Model spec, plus related payload
148/// data.
149#[derive(Clone, Copy, Debug)]
150enum Event {
151    None,
152    Subtask {
153        status: Status,
154    },
155    StreamRead {
156        code: ReturnCode,
157        pending: Option<(TypeStreamTableIndex, u32)>,
158    },
159    StreamWrite {
160        code: ReturnCode,
161        pending: Option<(TypeStreamTableIndex, u32)>,
162    },
163    FutureRead {
164        code: ReturnCode,
165        pending: Option<(TypeFutureTableIndex, u32)>,
166    },
167    FutureWrite {
168        code: ReturnCode,
169        pending: Option<(TypeFutureTableIndex, u32)>,
170    },
171    Cancelled,
172}
173
174impl Event {
175    /// Lower this event to core Wasm integers for delivery to the guest.
176    ///
177    /// Note that the waitable handle, if any, is assumed to be lowered
178    /// separately.
179    fn parts(self) -> (u32, u32) {
180        const EVENT_NONE: u32 = 0;
181        const EVENT_SUBTASK: u32 = 1;
182        const EVENT_STREAM_READ: u32 = 2;
183        const EVENT_STREAM_WRITE: u32 = 3;
184        const EVENT_FUTURE_READ: u32 = 4;
185        const EVENT_FUTURE_WRITE: u32 = 5;
186        const EVENT_CANCELLED: u32 = 6;
187        match self {
188            Event::None => (EVENT_NONE, 0),
189            Event::Cancelled => (EVENT_CANCELLED, 0),
190            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195        }
196    }
197}
198
199/// Corresponds to `CallbackCode` in the spec.
200mod callback_code {
201    pub const EXIT: u32 = 0;
202    pub const YIELD: u32 = 1;
203    pub const WAIT: u32 = 2;
204}
205
206/// A flag indicating that the callee is an async-lowered export.
207///
208/// This may be passed to the `async-start` intrinsic from a fused adapter.
209const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211/// Provides access to either store data (via the `get` method) or the store
212/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
213/// instance to which the current host task belongs.
214///
215/// See [`Accessor::with`] for details.
216pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217    store: StoreContextMut<'a, T>,
218    get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223    D: HasData + ?Sized,
224    T: 'static,
225{
226    /// Creates a new [`Access`] from its component parts.
227    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228        Self { store, get_data }
229    }
230
231    /// Get mutable access to the store data.
232    pub fn data_mut(&mut self) -> &mut T {
233        self.store.data_mut()
234    }
235
236    /// Get mutable access to the store data.
237    pub fn get(&mut self) -> D::Data<'_> {
238        (self.get_data)(self.data_mut())
239    }
240
241    /// Spawn a background task.
242    ///
243    /// See [`Accessor::spawn`] for details.
244    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
245    where
246        T: 'static,
247    {
248        let accessor = Accessor {
249            get_data: self.get_data,
250            token: StoreToken::new(self.store.as_context_mut()),
251        };
252        self.store
253            .as_context_mut()
254            .spawn_with_accessor(accessor, task)
255    }
256
257    /// Returns the getter this accessor is using to project from `T` into
258    /// `D::Data`.
259    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260        self.get_data
261    }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266    D: HasData + ?Sized,
267    T: 'static,
268{
269    type Data = T;
270
271    fn as_context(&self) -> StoreContext<'_, T> {
272        self.store.as_context()
273    }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278    D: HasData + ?Sized,
279    T: 'static,
280{
281    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282        self.store.as_context_mut()
283    }
284}
285
286/// Provides scoped mutable access to store data in the context of a concurrent
287/// host task future.
288///
289/// This allows multiple host task futures to execute concurrently and access
290/// the store between (but not across) `await` points.
291///
292/// # Rationale
293///
294/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
295/// `D::Data<'_>`. The problem this is solving, however, is that it does not
296/// literally store these values. The basic problem is that when a concurrent
297/// host future is being polled it has access to `&mut T` (and the whole
298/// `Store`) but when it's not being polled it does not have access to these
299/// values. This reflects how the store is only ever polling one future at a
300/// time so the store is effectively being passed between futures.
301///
302/// Rust's `Future` trait, however, has no means of passing a `Store`
303/// temporarily between futures. The [`Context`](core::task::Context) type does
304/// not have the ability to attach arbitrary information to it at this time.
305/// This type, [`Accessor`], is used to bridge this expressivity gap.
306///
307/// The [`Accessor`] type here represents the ability to acquire, temporarily in
308/// a synchronous manner, the current store. The [`Accessor::with`] function
309/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
310/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
311/// not take an `async` closure as its argument, instead it's a synchronous
312/// closure which must complete during on run of `Future::poll`. This reflects
313/// how the store is temporarily made available while a host future is being
314/// polled.
315///
316/// # Implementation
317///
318/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
319/// this type additionally doesn't even have a lifetime parameter. This is
320/// instead a representation of proof of the ability to acquire these while a
321/// future is being polled. Wasmtime will, when it polls a host future,
322/// configure ambient state such that the `Accessor` that a future closes over
323/// will work and be able to access the store.
324///
325/// This has a number of implications for users such as:
326///
327/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
328///   the lifetime of a single future.
329/// * A future is expected to, however, close over an `Accessor` and keep it
330///   alive probably for the duration of the entire future.
331/// * Different host futures will be given different `Accessor`s, and that's
332///   intentional.
333/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
334///   alleviates some otherwise required bounds to be written down.
335///
336/// # Using `Accessor` in `Drop`
337///
338/// The methods on `Accessor` are only expected to work in the context of
339/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
340/// host future can be dropped at any time throughout the system and Wasmtime
341/// store context is not necessarily available at that time. It's recommended to
342/// not use `Accessor` methods in anything connected to a `Drop` implementation
343/// as they will panic and have unintended results. If you run into this though
344/// feel free to file an issue on the Wasmtime repository.
345pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347    D: HasData + ?Sized,
348{
349    token: StoreToken<T>,
350    get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353/// A helper trait to take any type of accessor-with-data in functions.
354///
355/// This trait is similar to [`AsContextMut`] except that it's used when
356/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
357/// [`Accessor`] is the main type used in concurrent settings and is passed to
358/// functions such as [`Func::call_concurrent`].
359///
360/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
361/// this trait. This effectively means that regardless of the `D` in
362/// `Accessor<T, D>` it can still be passed to a function which just needs a
363/// store accessor.
364///
365/// Acquiring an [`Accessor`] can be done through
366/// [`StoreContextMut::run_concurrent`] for example or in a host function
367/// through
368/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
369pub trait AsAccessor {
370    /// The `T` in `Store<T>` that this accessor refers to.
371    type Data: 'static;
372
373    /// The `D` in `Accessor<T, D>`, or the projection out of
374    /// `Self::Data`.
375    type AccessorData: HasData + ?Sized;
376
377    /// Returns the accessor that this is referring to.
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382    type Data = T::Data;
383    type AccessorData = T::AccessorData;
384
385    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386        T::as_accessor(self)
387    }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391    type Data = T;
392    type AccessorData = D;
393
394    fn as_accessor(&self) -> &Accessor<T, D> {
395        self
396    }
397}
398
399// Note that it is intentional at this time that `Accessor` does not actually
400// store `&mut T` or anything similar. This distinctly enables the `Accessor`
401// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
402// that matter). This is used to ergonomically simplify bindings where the
403// majority of the time `Accessor` is closed over in a future which then needs
404// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
405// you already have to write `T: 'static`...) it helps to avoid this.
406//
407// Note as well that `Accessor` doesn't actually store its data at all. Instead
408// it's more of a "proof" of what can be accessed from TLS. API design around
409// `Accessor` and functions like `Linker::func_wrap_concurrent` are
410// intentionally made to ensure that `Accessor` is ideally only used in the
411// context that TLS variables are actually set. For example host functions are
412// given `&Accessor`, not `Accessor`, and this prevents them from persisting
413// the value outside of a future. Within the future the TLS variables are all
414// guaranteed to be set while the future is being polled.
415//
416// Finally though this is not an ironclad guarantee, but nor does it need to be.
417// The TLS APIs are designed to panic or otherwise model usage where they're
418// called recursively or similar. It's hoped that code cannot be constructed to
419// actually hit this at runtime but this is not a safety requirement at this
420// time.
421const _: () = {
422    const fn assert<T: Send + Sync>() {}
423    assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427    /// Creates a new `Accessor` backed by the specified functions.
428    ///
429    /// - `get`: used to retrieve the store
430    ///
431    /// - `get_data`: used to "project" from the store's associated data to
432    /// another type (e.g. a field of that data or a wrapper around it).
433    ///
434    /// - `spawn`: used to queue spawned background tasks to be run later
435    pub(crate) fn new(token: StoreToken<T>) -> Self {
436        Self {
437            token,
438            get_data: |x| x,
439        }
440    }
441}
442
443impl<T, D> Accessor<T, D>
444where
445    D: HasData + ?Sized,
446{
447    /// Run the specified closure, passing it mutable access to the store.
448    ///
449    /// This function is one of the main building blocks of the [`Accessor`]
450    /// type. This yields synchronous, blocking, access to the store via an
451    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
452    /// providing the ability to access `D` via [`Access::get`]. Note that the
453    /// `fun` here is given only temporary access to the store and `T`/`D`
454    /// meaning that the return value `R` here is not allowed to capture borrows
455    /// into the two. If access is needed to data within `T` or `D` outside of
456    /// this closure then it must be `clone`d out, for example.
457    ///
458    /// # Panics
459    ///
460    /// This function will panic if it is call recursively with any other
461    /// accessor already in scope. For example if `with` is called within `fun`,
462    /// then this function will panic. It is up to the embedder to ensure that
463    /// this does not happen.
464    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465        tls::get(|vmstore| {
466            fun(Access {
467                store: self.token.as_context_mut(vmstore),
468                get_data: self.get_data,
469            })
470        })
471    }
472
473    /// Returns the getter this accessor is using to project from `T` into
474    /// `D::Data`.
475    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476        self.get_data
477    }
478
479    /// Changes this accessor to access `D2` instead of the current type
480    /// parameter `D`.
481    ///
482    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
483    ///
484    /// # Panics
485    ///
486    /// When using this API the returned value is disconnected from `&self` and
487    /// the lifetime binding the `self` argument. An `Accessor` only works
488    /// within the context of the closure or async closure that it was
489    /// originally given to, however. This means that due to the fact that the
490    /// returned value has no lifetime connection it's possible to use the
491    /// accessor outside of `&self`, the original accessor, and panic.
492    ///
493    /// The returned value should only be used within the scope of the original
494    /// `Accessor` that `self` refers to.
495    pub fn with_getter<D2: HasData>(
496        &self,
497        get_data: fn(&mut T) -> D2::Data<'_>,
498    ) -> Accessor<T, D2> {
499        Accessor {
500            token: self.token,
501            get_data,
502        }
503    }
504
505    /// Spawn a background task which will receive an `&Accessor<T, D>` and
506    /// run concurrently with any other tasks in progress for the current
507    /// store.
508    ///
509    /// This is particularly useful for host functions which return a `stream`
510    /// or `future` such that the code to write to the write end of that
511    /// `stream` or `future` must run after the function returns.
512    ///
513    /// The returned [`JoinHandle`] may be used to cancel the task.
514    ///
515    /// # Panics
516    ///
517    /// Panics if called within a closure provided to the [`Accessor::with`]
518    /// function. This can only be called outside an active invocation of
519    /// [`Accessor::with`].
520    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
521    where
522        T: 'static,
523    {
524        let accessor = self.clone_for_spawn();
525        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526    }
527
528    fn clone_for_spawn(&self) -> Self {
529        Self {
530            token: self.token,
531            get_data: self.get_data,
532        }
533    }
534
535    /// Polls to see if this store contains any "interesting" tasks still within
536    /// it.
537    ///
538    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
539    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
540    /// the last remaining "interesting" task has exited the provided context's
541    /// waker will be notified. Note that only the waker passed to the last call
542    /// to `poll_no_interesting_tasks` will be notified, so this is only
543    /// appropriate to use one-at-a-time.
544    ///
545    /// The component model specification, as of this current date, does not
546    /// have a distinction between "interesting" tasks and not. The current
547    /// intention is that in a future revision of the component model this will
548    /// be distinguished at the component ABI level where tasks will be able to
549    /// flag themselves as "interesting" optionally. Additionally extra work can
550    /// be opted-in to being "interesting".
551    ///
552    /// For now what this means is that all component model tasks within this
553    /// store are considered interesting. This specifically includes the entire
554    /// duration of a task, so even all of the time after a task has returned
555    /// but before it has exited. This means that this function is, today,
556    /// effectively a proxy for "are there any more tasks still running in this
557    /// store". This can be used by embedders to determine whether there's any
558    /// more work going on, even in the background, for a particular guest.
559    /// Hosts can use this as a signal that the guest wants to stay alive a
560    /// little longer, even after a task has returned.
561    ///
562    /// In the future this predicate won't include all tasks in this store. Some
563    /// tasks will be able to flag themselves as not interesting, meaning that
564    /// when this returns ready it'd be possible that there are still tasks
565    /// remaining in the store.
566    ///
567    /// Note that at this time spawned threads within a task are always
568    /// considered uninteresting. If this function returns ready, then spawned
569    /// threads may still be in the store.
570    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571        self.with(|mut access| {
572            let store = access.as_context_mut().0;
573            let state = store.concurrent_state_mut();
574            if state.interesting_tasks == 0 {
575                Poll::Ready(())
576            } else {
577                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578                Poll::Pending
579            }
580        })
581    }
582}
583
584/// Represents a task which may be provided to `Accessor::spawn`,
585/// `Accessor::forward`, or `StorecContextMut::spawn`.
586// TODO: Replace this with `core::ops::AsyncFnOnce` when that becomes a viable
587// option.
588//
589// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
590// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
591// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
592// Send + Sync + 'static` fails with a type mismatch error when we try to pass
593// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
594// best we can do for the time being.
595pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
596where
597    D: HasData + ?Sized,
598{
599    /// Run the task.
600    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
601}
602
603/// Represents parameter and result metadata for the caller side of a
604/// guest->guest call orchestrated by a fused adapter.
605enum CallerInfo {
606    /// Metadata for a call to an async-lowered import
607    Async {
608        params: Vec<ValRaw>,
609        has_result: bool,
610    },
611    /// Metadata for a call to an sync-lowered import
612    Sync {
613        params: Vec<ValRaw>,
614        result_count: u32,
615    },
616}
617
618/// Indicates how a guest task is waiting on a waitable set.
619enum WaitMode {
620    /// The guest task is waiting using `task.wait`
621    Fiber(StoreFiber<'static>),
622    /// The guest task is waiting via a callback declared as part of an
623    /// async-lifted export.
624    Callback(Instance),
625}
626
627/// Represents the reason a fiber is suspending itself.
628#[derive(Debug)]
629enum SuspendReason {
630    /// The fiber is waiting for an event to be delivered to the specified
631    /// waitable set or task.
632    Waiting {
633        set: TableId<WaitableSet>,
634        thread: QualifiedThreadId,
635        skip_may_block_check: bool,
636    },
637    /// The fiber has finished handling its most recent work item and is waiting
638    /// for another (or to be dropped if it is no longer needed).
639    NeedWork,
640    /// The fiber is yielding and should be resumed once other tasks have had a
641    /// chance to run.
642    Yielding {
643        thread: QualifiedThreadId,
644        cancellable: bool,
645        skip_may_block_check: bool,
646    },
647    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
648    ExplicitlySuspending {
649        thread: QualifiedThreadId,
650        skip_may_block_check: bool,
651    },
652}
653
654/// Represents a pending call into guest code for a given guest task.
655enum GuestCallKind {
656    /// Indicates there's an event to deliver to the task, possibly related to a
657    /// waitable set the task has been waiting on or polling.
658    DeliverEvent {
659        /// The instance to which the task belongs.
660        instance: Instance,
661        /// The waitable set the event belongs to, if any.
662        ///
663        /// If this is `None` the event will be waiting in the
664        /// `GuestTask::event` field for the task.
665        set: Option<TableId<WaitableSet>>,
666    },
667    /// Indicates that a new guest task call is pending and may be executed
668    /// using the specified closure.
669    ///
670    /// If the closure returns `Ok(Some(call))`, the `call` should be run
671    /// immediately using `handle_guest_call`.
672    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
673    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
674}
675
676impl fmt::Debug for GuestCallKind {
677    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678        match self {
679            Self::DeliverEvent { instance, set } => f
680                .debug_struct("DeliverEvent")
681                .field("instance", instance)
682                .field("set", set)
683                .finish(),
684            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
685            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
686        }
687    }
688}
689
690/// The target of a suspension intrinsic.
691#[derive(Copy, Clone, Debug)]
692pub enum SuspensionTarget {
693    SomeSuspended(u32),
694    Some(u32),
695    None,
696}
697
698impl SuspensionTarget {
699    fn is_none(&self) -> bool {
700        matches!(self, SuspensionTarget::None)
701    }
702    fn is_some(&self) -> bool {
703        !self.is_none()
704    }
705}
706
707/// Represents a pending call into guest code for a given guest thread.
708#[derive(Debug)]
709struct GuestCall {
710    thread: QualifiedThreadId,
711    kind: GuestCallKind,
712}
713
714impl GuestCall {
715    /// Returns whether or not the call is ready to run.
716    ///
717    /// A call will not be ready to run if either:
718    ///
719    /// - the (sub-)component instance to be called has already been entered and
720    /// cannot be reentered until an in-progress call completes
721    ///
722    /// - the call is for a not-yet started task and the (sub-)component
723    /// instance to be called has backpressure enabled
724    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
725        let instance = store
726            .concurrent_state_mut()
727            .get_mut(self.thread.task)?
728            .instance;
729        let state = store.instance_state(instance).concurrent_state();
730
731        let ready = match &self.kind {
732            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
733            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
734            GuestCallKind::StartExplicit(_) => true,
735        };
736        log::trace!(
737            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
738            state.do_not_enter,
739            state.backpressure
740        );
741        Ok(ready)
742    }
743}
744
745/// Job to be run on a worker fiber.
746enum WorkerItem {
747    GuestCall(GuestCall),
748    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
749}
750
751/// Represents a pending work item to be handled by the event loop for a given
752/// component instance.
753enum WorkItem {
754    /// A host task to be pushed to `ConcurrentState::futures`.
755    PushFuture(AlwaysMut<HostTaskFuture>),
756    /// A fiber to resume.
757    ResumeFiber(StoreFiber<'static>),
758    /// A thread to resume.
759    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
760    /// A pending call into guest code for a given guest task.
761    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
762    /// A job to run on a worker fiber.
763    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
764}
765
766impl fmt::Debug for WorkItem {
767    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
768        match self {
769            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
770            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
771            Self::ResumeThread(instance, thread) => f
772                .debug_tuple("ResumeThread")
773                .field(instance)
774                .field(thread)
775                .finish(),
776            Self::GuestCall(instance, call) => f
777                .debug_tuple("GuestCall")
778                .field(instance)
779                .field(call)
780                .finish(),
781            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
782        }
783    }
784}
785
786/// Whether a suspension intrinsic was cancelled or completed
787#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
788pub(crate) enum WaitResult {
789    Cancelled,
790    Completed,
791}
792
793/// Poll the specified future until it completes on behalf of a guest->host call
794/// using a sync-lowered import.
795///
796/// This is similar to `Instance::first_poll` except it's for sync-lowered
797/// imports, meaning we don't need to handle cancellation and we can block the
798/// caller until the task completes, at which point the caller can handle
799/// lowering the result to the guest's stack and linear memory.
800pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
801    store: &mut dyn VMStore,
802    future: impl Future<Output = Result<R>> + Send + 'static,
803) -> Result<R> {
804    let state = store.concurrent_state_mut();
805    let task = state.current_host_thread()?;
806
807    // Wrap the future in a closure which will take care of stashing the result
808    // in `GuestTask::result` and resuming this fiber when the host task
809    // completes.
810    let mut future = Box::pin(async move {
811        let result = future.await?;
812        tls::get(move |store| {
813            let state = store.concurrent_state_mut();
814            let host_state = &mut state.get_mut(task)?.state;
815            assert!(matches!(host_state, HostTaskState::CalleeStarted));
816            *host_state = HostTaskState::CalleeFinished(Box::new(result));
817
818            Waitable::Host(task).set_event(
819                state,
820                Some(Event::Subtask {
821                    status: Status::Returned,
822                }),
823            )?;
824
825            Ok(())
826        })
827    }) as HostTaskFuture;
828
829    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
830    // add the future to `ConcurrentState::futures` and poll it automatically
831    // from the event loop if it doesn't complete immediately here.
832    let poll = tls::set(store, || {
833        future
834            .as_mut()
835            .poll(&mut Context::from_waker(&Waker::noop()))
836    });
837
838    match poll {
839        // It completed immediately; check the result and delete the task.
840        Poll::Ready(result) => result?,
841
842        // It did not complete immediately; add it to
843        // `ConcurrentState::futures` so it will be polled via the event loop;
844        // then use `GuestThread::sync_call_set` to wait for the task to
845        // complete, suspending the current fiber until it does so.
846        Poll::Pending => {
847            let state = store.concurrent_state_mut();
848            state.push_future(future);
849
850            let caller = state.get_mut(task)?.caller;
851            let set = state.get_mut(caller.thread)?.sync_call_set;
852            Waitable::Host(task).join(state, Some(set))?;
853
854            store.suspend(SuspendReason::Waiting {
855                set,
856                thread: caller,
857                skip_may_block_check: false,
858            })?;
859
860            // Remove the `task` from the `sync_call_set` to ensure that when
861            // this function returns and the task is deleted that there are no
862            // more lingering references to this host task.
863            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
864        }
865    }
866
867    // Retrieve and return the result.
868    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
869    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
870        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
871            Ok(result) => *result,
872            Err(_) => bail_bug!("host task finished with wrong type of result"),
873        }),
874        _ => bail_bug!("unexpected host task state after completion"),
875    }
876}
877
878/// Execute the specified guest call.
879fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
880    let mut next = Some(call);
881    while let Some(call) = next.take() {
882        match call.kind {
883            GuestCallKind::DeliverEvent { instance, set } => {
884                let (event, waitable) =
885                    match instance.get_event(store, call.thread.task, set, true)? {
886                        Some(pair) => pair,
887                        None => bail_bug!("delivering non-present event"),
888                    };
889                let state = store.concurrent_state_mut();
890                let task = state.get_mut(call.thread.task)?;
891                let runtime_instance = task.instance;
892                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
893
894                log::trace!(
895                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
896                    call.thread,
897                );
898
899                let old_thread = store.set_thread(call.thread)?;
900                log::trace!(
901                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
902                    call.thread
903                );
904
905                store.enter_instance(runtime_instance);
906
907                let Some(callback) = store
908                    .concurrent_state_mut()
909                    .get_mut(call.thread.task)?
910                    .callback
911                    .take()
912                else {
913                    bail_bug!("guest task callback field not present")
914                };
915
916                let code = callback(store, event, handle)?;
917
918                store
919                    .concurrent_state_mut()
920                    .get_mut(call.thread.task)?
921                    .callback = Some(callback);
922
923                store.exit_instance(runtime_instance)?;
924
925                store.set_thread(old_thread)?;
926
927                next = instance.handle_callback_code(
928                    store,
929                    call.thread,
930                    runtime_instance.index,
931                    code,
932                )?;
933
934                log::trace!(
935                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
936                );
937            }
938            GuestCallKind::StartImplicit(fun) => {
939                next = fun(store)?;
940            }
941            GuestCallKind::StartExplicit(fun) => {
942                fun(store)?;
943            }
944        }
945    }
946
947    Ok(())
948}
949
950impl<T> Store<T> {
951    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
952    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
953    where
954        T: Send + 'static,
955    {
956        ensure!(
957            self.as_context().0.concurrency_support(),
958            "cannot use `run_concurrent` when Config::concurrency_support disabled",
959        );
960        self.as_context_mut().run_concurrent(fun).await
961    }
962
963    #[doc(hidden)]
964    pub fn assert_concurrent_state_empty(&mut self) {
965        self.as_context_mut().assert_concurrent_state_empty();
966    }
967
968    #[doc(hidden)]
969    pub fn concurrent_state_table_size(&mut self) -> usize {
970        self.as_context_mut().concurrent_state_table_size()
971    }
972
973    /// Convenience wrapper for [`StoreContextMut::spawn`].
974    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
975    where
976        T: 'static,
977    {
978        self.as_context_mut().spawn(task)
979    }
980}
981
982impl<T> StoreContextMut<'_, T> {
983    /// Assert that all the relevant tables and queues in the concurrent state
984    /// for this store are empty.
985    ///
986    /// This is for sanity checking in integration tests
987    /// (e.g. `component-async-tests`) that the relevant state has been cleared
988    /// after each test concludes.  This should help us catch leaks, e.g. guest
989    /// tasks which haven't been deleted despite having completed and having
990    /// been dropped by their supertasks.
991    ///
992    /// Only intended for use in Wasmtime's own testing.
993    #[doc(hidden)]
994    pub fn assert_concurrent_state_empty(self) {
995        let store = self.0;
996        store
997            .store_data_mut()
998            .components
999            .assert_instance_states_empty();
1000        let state = store.concurrent_state_mut();
1001        assert!(
1002            state.table.get_mut().is_empty(),
1003            "non-empty table: {:?}",
1004            state.table.get_mut()
1005        );
1006        assert!(state.high_priority.is_empty());
1007        assert!(state.low_priority.is_empty());
1008        assert!(state.current_thread.is_none());
1009        assert!(state.futures_mut().unwrap().is_empty());
1010        assert!(state.global_error_context_ref_counts.is_empty());
1011    }
1012
1013    /// Helper function to perform tests over the size of the concurrent state
1014    /// table which can be useful for detecting leaks.
1015    ///
1016    /// Only intended for use in Wasmtime's own testing.
1017    #[doc(hidden)]
1018    pub fn concurrent_state_table_size(&mut self) -> usize {
1019        self.0
1020            .concurrent_state_mut()
1021            .table
1022            .get_mut()
1023            .iter_mut()
1024            .count()
1025    }
1026
1027    /// Spawn a background task to run as part of this instance's event loop.
1028    ///
1029    /// The task will receive an `&Accessor<U>` and run concurrently with
1030    /// any other tasks in progress for the instance.
1031    ///
1032    /// Note that the task will only make progress if and when the event loop
1033    /// for this instance is run.
1034    ///
1035    /// The returned [`JoinHandle`] may be used to cancel the task.
1036    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1037    where
1038        T: 'static,
1039    {
1040        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1041        self.spawn_with_accessor(accessor, task)
1042    }
1043
1044    /// Internal implementation of `spawn` functions where a `store` is
1045    /// available along with an `Accessor`.
1046    fn spawn_with_accessor<D>(
1047        self,
1048        accessor: Accessor<T, D>,
1049        task: impl AccessorTask<T, D>,
1050    ) -> JoinHandle
1051    where
1052        T: 'static,
1053        D: HasData + ?Sized,
1054    {
1055        // Create an "abortable future" here where internally the future will
1056        // hook calls to poll and possibly spawn more background tasks on each
1057        // iteration.
1058        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1059        self.0
1060            .concurrent_state_mut()
1061            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1062        handle
1063    }
1064
1065    /// Run the specified closure `fun` to completion as part of this store's
1066    /// event loop.
1067    ///
1068    /// This will run `fun` as part of this store's event loop until it
1069    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1070    /// controlled access to the store and its data.
1071    ///
1072    /// This function can be used to invoke [`Func::call_concurrent`] for
1073    /// example within the async closure provided here.
1074    ///
1075    /// This function will unconditionally return an error if
1076    /// [`Config::concurrency_support`] is disabled.
1077    ///
1078    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1079    ///
1080    /// # Store-blocking behavior
1081    ///
1082    /// At this time there are certain situations in which the `Future` returned
1083    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1084    /// extended period of time, despite one or more `Waker::wake` events having
1085    /// occurred for the task to which it belongs.  This can manifest as the
1086    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1087    /// the `Store` being held by e.g. a blocking host function, preventing the
1088    /// `Future` from being polled. A canonical example of this is when the
1089    /// `fun` provided to this function attempts to set a timeout for an
1090    /// invocation of a wasm function. In this situation the async closure is
1091    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1092    /// to elapse. At this time this setup will not always work and the timeout
1093    /// may not reliably fire.
1094    ///
1095    /// This function will not block the current thread and as such is always
1096    /// suitable to run in an `async` context, but the current implementation of
1097    /// Wasmtime can lead to situations where a certain wasm computation is
1098    /// required to make progress the closure to make progress. This is an
1099    /// artifact of Wasmtime's historical implementation of `async` functions
1100    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1101    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1102    /// progress for a readiness notification of (b) to get delivered.
1103    ///
1104    /// This effectively means that it's not possible to reliably perform a
1105    /// "select" operation within the `fun` closure, which timeouts for example
1106    /// are based on. Fixing this requires some relatively major refactoring
1107    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1108    /// that is intended to be fixed one day. In the meantime it's recommended
1109    /// to apply timeouts or such to the entire `run_concurrent` call itself
1110    /// rather than internally.
1111    ///
1112    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1113    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1114    ///
1115    /// # Example
1116    ///
1117    /// ```
1118    /// # use {
1119    /// #   wasmtime::{
1120    /// #     error::{Result},
1121    /// #     component::{ Component, Linker, Resource, ResourceTable},
1122    /// #     Config, Engine, Store
1123    /// #   },
1124    /// # };
1125    /// #
1126    /// # struct MyResource(u32);
1127    /// # struct Ctx { table: ResourceTable }
1128    /// #
1129    /// # async fn foo() -> Result<()> {
1130    /// # let mut config = Config::new();
1131    /// # let engine = Engine::new(&config)?;
1132    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1133    /// # let mut linker = Linker::new(&engine);
1134    /// # let component = Component::new(&engine, "")?;
1135    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1136    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1137    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1138    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1139    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1140    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1141    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1142    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1143    ///    Ok(())
1144    /// }).await??;
1145    /// # Ok(())
1146    /// # }
1147    /// ```
1148    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1149    where
1150        T: Send + 'static,
1151    {
1152        ensure!(
1153            self.0.concurrency_support(),
1154            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1155        );
1156        self.do_run_concurrent(fun, false).await
1157    }
1158
1159    pub(super) async fn run_concurrent_trap_on_idle<R>(
1160        self,
1161        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1162    ) -> Result<R>
1163    where
1164        T: Send + 'static,
1165    {
1166        self.do_run_concurrent(fun, true).await
1167    }
1168
1169    async fn do_run_concurrent<R>(
1170        mut self,
1171        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1172        trap_on_idle: bool,
1173    ) -> Result<R>
1174    where
1175        T: Send + 'static,
1176    {
1177        debug_assert!(self.0.concurrency_support());
1178        check_recursive_run();
1179        let token = StoreToken::new(self.as_context_mut());
1180
1181        struct Dropper<'a, T: 'static, V> {
1182            store: StoreContextMut<'a, T>,
1183            value: ManuallyDrop<V>,
1184        }
1185
1186        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1187            fn drop(&mut self) {
1188                tls::set(self.store.0, || {
1189                    // SAFETY: Here we drop the value without moving it for the
1190                    // first and only time -- per the contract for `Drop::drop`,
1191                    // this code won't run again, and the `value` field will no
1192                    // longer be accessible.
1193                    unsafe { ManuallyDrop::drop(&mut self.value) }
1194                });
1195            }
1196        }
1197
1198        let accessor = &Accessor::new(token);
1199        let dropper = &mut Dropper {
1200            store: self,
1201            value: ManuallyDrop::new(fun(accessor)),
1202        };
1203        // SAFETY: We never move `dropper` nor its `value` field.
1204        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1205
1206        dropper
1207            .store
1208            .as_context_mut()
1209            .poll_until(future, trap_on_idle)
1210            .await
1211    }
1212
1213    /// Run this store's event loop.
1214    ///
1215    /// The returned future will resolve when the specified future completes or,
1216    /// if `trap_on_idle` is true, when the event loop can't make further
1217    /// progress.
1218    async fn poll_until<R>(
1219        mut self,
1220        mut future: Pin<&mut impl Future<Output = R>>,
1221        trap_on_idle: bool,
1222    ) -> Result<R>
1223    where
1224        T: Send + 'static,
1225    {
1226        struct Reset<'a, T: 'static> {
1227            store: StoreContextMut<'a, T>,
1228            futures: Option<FuturesUnordered<HostTaskFuture>>,
1229        }
1230
1231        impl<'a, T> Drop for Reset<'a, T> {
1232            fn drop(&mut self) {
1233                if let Some(futures) = self.futures.take() {
1234                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1235                }
1236            }
1237        }
1238
1239        loop {
1240            // Take `ConcurrentState::futures` out of the store so we can poll
1241            // it while also safely giving any of the futures inside access to
1242            // `self`.
1243            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1244            let mut reset = Reset {
1245                store: self.as_context_mut(),
1246                futures,
1247            };
1248            let mut next = match reset.futures.as_mut() {
1249                Some(f) => pin!(f.next()),
1250                None => bail_bug!("concurrent state missing futures field"),
1251            };
1252
1253            enum PollResult<R> {
1254                Complete(R),
1255                ProcessWork {
1256                    ready: Vec<WorkItem>,
1257                    low_priority: bool,
1258                },
1259            }
1260
1261            let result = future::poll_fn(|cx| {
1262                // First, poll the future we were passed as an argument and
1263                // return immediately if it's ready.
1264                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1265                    return Poll::Ready(Ok(PollResult::Complete(value)));
1266                }
1267
1268                // Next, poll `ConcurrentState::futures` (which includes any
1269                // pending host tasks and/or background tasks), returning
1270                // immediately if one of them fails.
1271                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1272                    Poll::Ready(Some(output)) => {
1273                        match output {
1274                            Err(e) => return Poll::Ready(Err(e)),
1275                            Ok(()) => {}
1276                        }
1277                        Poll::Ready(true)
1278                    }
1279                    Poll::Ready(None) => Poll::Ready(false),
1280                    Poll::Pending => Poll::Pending,
1281                };
1282
1283                // Next, collect the next batch of work items to process, if
1284                // any.  This will be either all of the high-priority work
1285                // items, or if there are none, a single low-priority work item.
1286                let state = reset.store.0.concurrent_state_mut();
1287                let mut ready = mem::take(&mut state.high_priority);
1288                let mut low_priority = false;
1289                if ready.is_empty() {
1290                    if let Some(item) = state.low_priority.pop_back() {
1291                        ready.push(item);
1292                        low_priority = true;
1293                    }
1294                }
1295                if !ready.is_empty() {
1296                    return Poll::Ready(Ok(PollResult::ProcessWork {
1297                        ready,
1298                        low_priority,
1299                    }));
1300                }
1301
1302                // Finally, if we have nothing else to do right now, determine what to do
1303                // based on whether there are any pending futures in
1304                // `ConcurrentState::futures`.
1305                return match next {
1306                    Poll::Ready(true) => {
1307                        // In this case, one of the futures in
1308                        // `ConcurrentState::futures` completed
1309                        // successfully, so we return now and continue
1310                        // the outer loop in case there is another one
1311                        // ready to complete.
1312                        Poll::Ready(Ok(PollResult::ProcessWork {
1313                            ready: Vec::new(),
1314                            low_priority: false,
1315                        }))
1316                    }
1317                    Poll::Ready(false) => {
1318                        // Poll the future we were passed one last time
1319                        // in case one of `ConcurrentState::futures` had
1320                        // the side effect of unblocking it.
1321                        if let Poll::Ready(value) =
1322                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1323                        {
1324                            Poll::Ready(Ok(PollResult::Complete(value)))
1325                        } else {
1326                            // In this case, there are no more pending
1327                            // futures in `ConcurrentState::futures`,
1328                            // there are no remaining work items, _and_
1329                            // the future we were passed as an argument
1330                            // still hasn't completed.
1331                            if trap_on_idle {
1332                                // `trap_on_idle` is true, so we exit
1333                                // immediately.
1334                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1335                            } else {
1336                                // `trap_on_idle` is false, so we assume
1337                                // that future will wake up and give us
1338                                // more work to do when it's ready to.
1339                                Poll::Pending
1340                            }
1341                        }
1342                    }
1343                    // There is at least one pending future in
1344                    // `ConcurrentState::futures` and we have nothing
1345                    // else to do but wait for now, so we return
1346                    // `Pending`.
1347                    Poll::Pending => Poll::Pending,
1348                };
1349            })
1350            .await;
1351
1352            // Put the `ConcurrentState::futures` back into the store before we
1353            // return or handle any work items since one or more of those items
1354            // might append more futures.
1355            drop(reset);
1356
1357            match result? {
1358                // The future we were passed as an argument completed, so we
1359                // return the result.
1360                PollResult::Complete(value) => break Ok(value),
1361                // The future we were passed has not yet completed, so handle
1362                // any work items and then loop again.
1363                PollResult::ProcessWork {
1364                    ready,
1365                    low_priority,
1366                } => {
1367                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1368                        store: StoreContextMut<'a, T>,
1369                        ready: I,
1370                    }
1371
1372                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1373                        fn drop(&mut self) {
1374                            while let Some(item) = self.ready.next() {
1375                                match item {
1376                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1377                                    WorkItem::PushFuture(future) => {
1378                                        tls::set(self.store.0, move || drop(future))
1379                                    }
1380                                    _ => {}
1381                                }
1382                            }
1383                        }
1384                    }
1385
1386                    let mut dispose = Dispose {
1387                        store: self.as_context_mut(),
1388                        ready: ready.into_iter(),
1389                    };
1390
1391                    // If we're about to run a low-priority task, first yield to
1392                    // the executor.  This ensures that it won't be starved of
1393                    // the ability to e.g. update the readiness of sockets,
1394                    // etc. which the guest may be using `thread.yield` along
1395                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1396                    //
1397                    // This works for e.g. `thread.yield` and callbacks which
1398                    // return `CALLBACK_CODE_YIELD` because we queue a low
1399                    // priority item to resume the task (i.e. resume the thread
1400                    // or call the callback, respectively) just prior to
1401                    // suspending it.  Indeed, as of this writing those are the
1402                    // _only_ situations we queue low-priority tasks.
1403                    // Therefore, we interpret the guest's request to yield as
1404                    // meaning "yield to other guest tasks _and_/_or_ host
1405                    // operations such as updating socket readiness", the latter
1406                    // being the async runtime's responsibility.
1407                    //
1408                    // In the future, if this ends up causing measurable
1409                    // performance issues, this could be optimized such that we
1410                    // only yield periodically (e.g. for batches of low priority
1411                    // items) and not for each and every idividual item.
1412                    if low_priority {
1413                        dispose.store.0.yield_now().await
1414                    }
1415
1416                    while let Some(item) = dispose.ready.next() {
1417                        dispose
1418                            .store
1419                            .as_context_mut()
1420                            .handle_work_item(item)
1421                            .await?;
1422                    }
1423                }
1424            }
1425        }
1426    }
1427
1428    /// Handle the specified work item, possibly resuming a fiber if applicable.
1429    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1430    where
1431        T: Send,
1432    {
1433        log::trace!("handle work item {item:?}");
1434        match item {
1435            WorkItem::PushFuture(future) => {
1436                self.0
1437                    .concurrent_state_mut()
1438                    .futures_mut()?
1439                    .push(future.into_inner());
1440            }
1441            WorkItem::ResumeFiber(fiber) => {
1442                self.0.resume_fiber(fiber).await?;
1443            }
1444            WorkItem::ResumeThread(_, thread) => {
1445                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1446                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1447                    GuestThreadState::Running,
1448                ) {
1449                    self.0.resume_fiber(fiber).await?;
1450                } else {
1451                    bail_bug!("cannot resume non-pending thread {thread:?}");
1452                }
1453            }
1454            WorkItem::GuestCall(_, call) => {
1455                if call.is_ready(self.0)? {
1456                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1457                } else {
1458                    let state = self.0.concurrent_state_mut();
1459                    let task = state.get_mut(call.thread.task)?;
1460                    if !task.starting_sent {
1461                        task.starting_sent = true;
1462                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1463                            Waitable::Guest(call.thread.task).set_event(
1464                                state,
1465                                Some(Event::Subtask {
1466                                    status: Status::Starting,
1467                                }),
1468                            )?;
1469                        }
1470                    }
1471
1472                    let instance = state.get_mut(call.thread.task)?.instance;
1473                    self.0
1474                        .instance_state(instance)
1475                        .concurrent_state()
1476                        .pending
1477                        .insert(call.thread, call.kind);
1478                }
1479            }
1480            WorkItem::WorkerFunction(fun) => {
1481                self.run_on_worker(WorkerItem::Function(fun)).await?;
1482            }
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Execute the specified guest call on a worker fiber.
1489    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1490    where
1491        T: Send,
1492    {
1493        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1494            fiber
1495        } else {
1496            fiber::make_fiber(self.0, move |store| {
1497                loop {
1498                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1499                        bail_bug!("worker_item not present when resuming fiber")
1500                    };
1501                    match item {
1502                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1503                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1504                    }
1505
1506                    store.suspend(SuspendReason::NeedWork)?;
1507                }
1508            })?
1509        };
1510
1511        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1512        assert!(worker_item.is_none());
1513        *worker_item = Some(item);
1514
1515        self.0.resume_fiber(worker).await
1516    }
1517
1518    /// Wrap the specified host function in a future which will call it, passing
1519    /// it an `&Accessor<T>`.
1520    ///
1521    /// See the `Accessor` documentation for details.
1522    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1523    where
1524        T: 'static,
1525        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1526            + Send
1527            + Sync
1528            + 'static,
1529        R: Send + Sync + 'static,
1530    {
1531        let token = StoreToken::new(self);
1532        async move {
1533            let mut accessor = Accessor::new(token);
1534            closure(&mut accessor).await
1535        }
1536    }
1537
1538    /// Returns an iterator over the current async call stack defined by the
1539    /// component model.
1540    ///
1541    /// This can be used, for example to correlate a host import call with which
1542    /// root export task originally called it.
1543    ///
1544    /// Tasks are yielded "youngest first" where the first item in the iterator
1545    /// is the current task, and the last item in the iterator is the original
1546    /// call.
1547    pub fn async_call_stack(&mut self) -> impl Iterator<Item = GuestTaskId> {
1548        let state = self.0.concurrent_state_mut();
1549        let mut cur = Some(state.current_thread);
1550        core::iter::from_fn(move || {
1551            while let Some(t) = cur {
1552                cur = state.parent(t);
1553                if let Some(thread) = t.guest() {
1554                    return Some(GuestTaskId(thread.task));
1555                }
1556            }
1557
1558            None
1559        })
1560    }
1561}
1562
1563impl StoreOpaque {
1564    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1565    /// guest-to-guest call or a sync host-to-guest call.
1566    ///
1567    /// This task will only be used for the purpose of handling calls to
1568    /// intrinsic functions; both parameter lowering and result lifting are
1569    /// assumed to be taken care of elsewhere.
1570    pub(crate) fn enter_guest_sync_call(
1571        &mut self,
1572        guest_caller: Option<RuntimeInstance>,
1573        callee_async: bool,
1574        callee: RuntimeInstance,
1575    ) -> Result<()> {
1576        log::trace!("enter sync call {callee:?}");
1577        if !self.concurrency_support() {
1578            return self.enter_call_not_concurrent();
1579        }
1580
1581        let state = self.concurrent_state_mut();
1582        let thread = state.current_thread;
1583        let instance = if let Some(thread) = thread.guest() {
1584            Some(state.get_mut(thread.task)?.instance)
1585        } else {
1586            None
1587        };
1588        if guest_caller.is_some() {
1589            debug_assert_eq!(instance, guest_caller);
1590        }
1591        let guest_thread = GuestTask::new(
1592            state,
1593            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1594            LiftResult {
1595                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1596                ty: TypeTupleIndex::reserved_value(),
1597                memory: None,
1598                string_encoding: StringEncoding::Utf8,
1599            },
1600            if let Some(thread) = thread.guest() {
1601                Caller::Guest { thread: *thread }
1602            } else {
1603                Caller::Host {
1604                    tx: None,
1605                    host_future_present: false,
1606                    caller: thread,
1607                }
1608            },
1609            None,
1610            callee,
1611            callee_async,
1612        )?;
1613
1614        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1615            guest_thread.thread,
1616            self,
1617            callee.index,
1618        )?;
1619        self.set_thread(guest_thread)?;
1620
1621        Ok(())
1622    }
1623
1624    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1625    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1626        if !self.concurrency_support() {
1627            return Ok(self.exit_call_not_concurrent());
1628        }
1629        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1630            Some(t) => *t,
1631            None => bail_bug!("expected task when exiting"),
1632        };
1633        let task = self.concurrent_state_mut().get_mut(thread.task)?;
1634        let instance = task.instance;
1635        let caller = match &task.caller {
1636            &Caller::Guest { thread } => thread.into(),
1637            &Caller::Host { caller, .. } => caller,
1638        };
1639        task.lift_result = None;
1640        task.exited = true;
1641        self.set_thread(caller)?;
1642
1643        log::trace!("exit sync call {instance:?}");
1644        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1645
1646        Ok(())
1647    }
1648
1649    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1650    /// transition to the host.
1651    ///
1652    /// FIXME: this is called for all guest->host transitions and performs some
1653    /// relatively expensive table manipulations. This would ideally be
1654    /// optimized to avoid the full allocation of a `HostTask` in at least some
1655    /// situations.
1656    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1657        if !self.concurrency_support() {
1658            self.enter_call_not_concurrent()?;
1659            return Ok(None);
1660        }
1661        let state = self.concurrent_state_mut();
1662        let caller = state.current_guest_thread()?;
1663        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1664        log::trace!("new host task {task:?}");
1665        self.set_thread(task)?;
1666        Ok(Some(task))
1667    }
1668
1669    /// Invoked before lowering the results of a host task to the guest.
1670    ///
1671    /// This is used to update the current thread annotations within the store
1672    /// to ensure that it reflects the guest task, not the host task, since
1673    /// lowering may execute guest code.
1674    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1675        if !self.concurrency_support() {
1676            return Ok(());
1677        }
1678        let task = self.concurrent_state_mut().current_host_thread()?;
1679        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1680        self.set_thread(caller)?;
1681        Ok(())
1682    }
1683
1684    /// Dual of `host_task_create` and signifies that the host has finished and
1685    /// will be cleaned up.
1686    ///
1687    /// Note that this isn't invoked when the host is invoked asynchronously and
1688    /// the host isn't complete yet. In that situation the host task persists
1689    /// and will be cleaned up separately in `subtask_drop`
1690    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1691        match task {
1692            Some(task) => {
1693                log::trace!("delete host task {task:?}");
1694                self.concurrent_state_mut().delete(task)?;
1695            }
1696            None => {
1697                self.exit_call_not_concurrent();
1698            }
1699        }
1700        Ok(())
1701    }
1702
1703    /// Determine whether the specified instance may be entered from the host.
1704    ///
1705    /// We return `true` here only if all of the following hold:
1706    ///
1707    /// - The top-level instance is not already on the current task's call stack.
1708    /// - The instance is not in need of a post-return function call.
1709    /// - `self` has not been poisoned due to a trap.
1710    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1711        if self.trapped() {
1712            return Ok(false);
1713        }
1714        if !self.concurrency_support() {
1715            return Ok(true);
1716        }
1717        let state = self.concurrent_state_mut();
1718        let mut cur = Some(state.current_thread);
1719        while let Some(t) = cur {
1720            if let Some(thread) = t.guest() {
1721                let task = state.get_mut(thread.task)?;
1722                // Note that we only compare top-level instance IDs here.
1723                // The idea is that the host is not allowed to recursively
1724                // enter a top-level instance even if the specific leaf
1725                // instance is not on the stack. This the behavior defined
1726                // in the spec, and it allows us to elide runtime checks in
1727                // guest-to-guest adapters.
1728                if task.instance.instance == instance.instance {
1729                    return Ok(false);
1730                }
1731            }
1732            cur = state.parent(t);
1733        }
1734        Ok(true)
1735    }
1736
1737    /// Helper function to retrieve the `InstanceState` for the
1738    /// specified instance.
1739    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1740        self.component_instance_mut(instance.instance)
1741            .instance_state(instance.index)
1742    }
1743
1744    /// Configure the currently running `thread`.
1745    ///
1746    /// This will save off any state necessary for the previous thread, if
1747    /// applicable, and then it'll additionally update state for `thread` if
1748    /// needed too.
1749    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1750        let thread = thread.into();
1751        let state = self.concurrent_state_mut();
1752        let old_thread = mem::replace(&mut state.current_thread, thread);
1753
1754        // First thing to do after swapping threads is updating the context
1755        // slots for this thread within the store. This restores the behavior of
1756        // `context.{get,set}`. This involves taking the old state out of the
1757        // store, saving it in the thread that's being swapped from, and doing
1758        // the inverse for the new thread. When debug assertions are enabled
1759        // this also leaves behind sentinel values to try to uncover bugs where
1760        // this may be forgotten.
1761        if let Some(old_thread) = old_thread.guest() {
1762            let old_context = self.vm_store_context().component_context;
1763            self.concurrent_state_mut()
1764                .get_mut(old_thread.thread)?
1765                .context = old_context;
1766        }
1767        if cfg!(debug_assertions) {
1768            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1769        }
1770        if let Some(thread) = thread.guest() {
1771            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1772            let context = thread.context;
1773            if cfg!(debug_assertions) {
1774                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1775            }
1776            self.vm_store_context_mut().component_context = context;
1777        }
1778
1779        // Each time we switch threads, we conservatively set `task_may_block`
1780        // to `false` for the component instance we're switching away from (if
1781        // any), meaning it will be `false` for any new thread created for that
1782        // instance unless explicitly set otherwise.
1783        //
1784        // Additionally if we're switching to a new thread, set its component
1785        // instance's `task_may_block` according to where it left off.
1786        let state = self.concurrent_state_mut();
1787        if let Some(old_thread) = old_thread.guest() {
1788            let instance = state.get_mut(old_thread.task)?.instance.instance;
1789            self.component_instance_mut(instance)
1790                .set_task_may_block(false)
1791        }
1792
1793        if thread.guest().is_some() {
1794            self.set_task_may_block()?;
1795        }
1796
1797        Ok(old_thread)
1798    }
1799
1800    /// Set the global variable representing whether the current task may block
1801    /// prior to entering Wasm code.
1802    fn set_task_may_block(&mut self) -> Result<()> {
1803        let state = self.concurrent_state_mut();
1804        let guest_thread = state.current_guest_thread()?;
1805        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1806        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1807        self.component_instance_mut(instance)
1808            .set_task_may_block(may_block);
1809        Ok(())
1810    }
1811
1812    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1813        if !self.concurrency_support() {
1814            return Ok(());
1815        }
1816        let state = self.concurrent_state_mut();
1817        let task = state.current_guest_thread()?.task;
1818        let instance = state.get_mut(task)?.instance.instance;
1819        let task_may_block = self.component_instance(instance).get_task_may_block();
1820
1821        if task_may_block {
1822            Ok(())
1823        } else {
1824            Err(Trap::CannotBlockSyncTask.into())
1825        }
1826    }
1827
1828    /// Record that we're about to enter a (sub-)component instance which does
1829    /// not support more than one concurrent, stackful activation, meaning it
1830    /// cannot be entered again until the next call returns.
1831    fn enter_instance(&mut self, instance: RuntimeInstance) {
1832        log::trace!("enter {instance:?}");
1833        self.instance_state(instance)
1834            .concurrent_state()
1835            .do_not_enter = true;
1836    }
1837
1838    /// Record that we've exited a (sub-)component instance previously entered
1839    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1840    /// See the documentation for the latter for details.
1841    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1842        log::trace!("exit {instance:?}");
1843        self.instance_state(instance)
1844            .concurrent_state()
1845            .do_not_enter = false;
1846        self.partition_pending(instance)
1847    }
1848
1849    /// Iterate over `InstanceState::pending`, moving any ready items into the
1850    /// "high priority" work item queue.
1851    ///
1852    /// See `GuestCall::is_ready` for details.
1853    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1854        for (thread, kind) in
1855            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1856        {
1857            let call = GuestCall { thread, kind };
1858            if call.is_ready(self)? {
1859                self.concurrent_state_mut()
1860                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1861            } else {
1862                self.instance_state(instance)
1863                    .concurrent_state()
1864                    .pending
1865                    .insert(call.thread, call.kind);
1866            }
1867        }
1868
1869        Ok(())
1870    }
1871
1872    /// Implements the `backpressure.{inc,dec}` intrinsics.
1873    pub(crate) fn backpressure_modify(
1874        &mut self,
1875        caller_instance: RuntimeInstance,
1876        modify: impl FnOnce(u16) -> Option<u16>,
1877    ) -> Result<()> {
1878        let state = self.instance_state(caller_instance).concurrent_state();
1879        let old = state.backpressure;
1880        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1881        state.backpressure = new;
1882
1883        if old > 0 && new == 0 {
1884            // Backpressure was previously enabled and is now disabled; move any
1885            // newly-eligible guest calls to the "high priority" queue.
1886            self.partition_pending(caller_instance)?;
1887        }
1888
1889        Ok(())
1890    }
1891
1892    /// Resume the specified fiber, giving it exclusive access to the specified
1893    /// store.
1894    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1895        let old_thread = self.concurrent_state_mut().current_thread;
1896        log::trace!("resume_fiber: save current thread {old_thread:?}");
1897
1898        let fiber = fiber::resolve_or_release(self, fiber).await?;
1899
1900        self.set_thread(old_thread)?;
1901
1902        let state = self.concurrent_state_mut();
1903
1904        if let Some(ot) = old_thread.guest() {
1905            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1906        }
1907        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1908
1909        if let Some(mut fiber) = fiber {
1910            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1911            // See the `SuspendReason` documentation for what each case means.
1912            let reason = match state.suspend_reason.take() {
1913                Some(r) => r,
1914                None => bail_bug!("suspend reason missing when resuming fiber"),
1915            };
1916            match reason {
1917                SuspendReason::NeedWork => {
1918                    if state.worker.is_none() {
1919                        state.worker = Some(fiber);
1920                    } else {
1921                        fiber.dispose(self);
1922                    }
1923                }
1924                SuspendReason::Yielding {
1925                    thread,
1926                    cancellable,
1927                    ..
1928                } => {
1929                    state.get_mut(thread.thread)?.state =
1930                        GuestThreadState::Ready { fiber, cancellable };
1931                    let instance = state.get_mut(thread.task)?.instance.index;
1932                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1933                }
1934                SuspendReason::ExplicitlySuspending { thread, .. } => {
1935                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1936                }
1937                SuspendReason::Waiting { set, thread, .. } => {
1938                    let old = state
1939                        .get_mut(set)?
1940                        .waiting
1941                        .insert(thread, WaitMode::Fiber(fiber));
1942                    assert!(old.is_none());
1943                }
1944            };
1945        } else {
1946            log::trace!("resume_fiber: fiber has exited");
1947        }
1948
1949        Ok(())
1950    }
1951
1952    /// Suspend the current fiber, storing the reason in
1953    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1954    /// it should be resumed.
1955    ///
1956    /// See the `SuspendReason` documentation for details.
1957    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1958        log::trace!("suspend fiber: {reason:?}");
1959
1960        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1961        // pop the call context which manages resource borrows before suspending
1962        // and then push it again once we've resumed.
1963        let task = match &reason {
1964            SuspendReason::Yielding { thread, .. }
1965            | SuspendReason::Waiting { thread, .. }
1966            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1967            SuspendReason::NeedWork => None,
1968        };
1969
1970        let old_guest_thread = if task.is_some() {
1971            self.concurrent_state_mut().current_thread
1972        } else {
1973            CurrentThread::None
1974        };
1975
1976        // We should not have reached here unless either there's no current
1977        // task, or the current task is permitted to block.  In addition, we
1978        // special-case `thread.switch-to` and waiting for a subtask to go from
1979        // `starting` to `started`, both of which we consider non-blocking
1980        // operations despite requiring a suspend.
1981        debug_assert!(
1982            matches!(
1983                reason,
1984                SuspendReason::ExplicitlySuspending {
1985                    skip_may_block_check: true,
1986                    ..
1987                } | SuspendReason::Waiting {
1988                    skip_may_block_check: true,
1989                    ..
1990                } | SuspendReason::Yielding {
1991                    skip_may_block_check: true,
1992                    ..
1993                }
1994            ) || old_guest_thread
1995                .guest()
1996                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1997                .transpose()?
1998                .unwrap_or(true)
1999        );
2000
2001        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
2002        assert!(suspend_reason.is_none());
2003        *suspend_reason = Some(reason);
2004
2005        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2006
2007        if task.is_some() {
2008            self.set_thread(old_guest_thread)?;
2009        }
2010
2011        Ok(())
2012    }
2013
2014    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2015        let state = self.concurrent_state_mut();
2016
2017        if waitable.common(state)?.set.is_some() {
2018            bail!(Trap::WaitableSyncAndAsync);
2019        }
2020
2021        let caller = state.current_guest_thread()?;
2022        let set = state.get_mut(caller.thread)?.sync_call_set;
2023        waitable.join(state, Some(set))?;
2024        self.suspend(SuspendReason::Waiting {
2025            set,
2026            thread: caller,
2027            skip_may_block_check: false,
2028        })?;
2029        let state = self.concurrent_state_mut();
2030        waitable.join(state, None)
2031    }
2032
2033    /// Cleans up the data structures backing the `guest_thread` specified,
2034    /// removing it from the internal tables of `runtime_instance` as well.
2035    ///
2036    /// This function is used whenever a guest thread has fully exited and
2037    /// completed. This'll clean up the associated `GuestThread` structure and
2038    /// related resources it contains.
2039    ///
2040    /// Other functionality that this implements is:
2041    ///
2042    /// * This will perform conditional cleanup of the `GuestTask` that owns
2043    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2044    /// * If there are no more threads in the `GuestTask` that this thread is
2045    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2046    ///   returned or cancelled), then a trap will be raised that a result
2047    ///   wasn't ever produced.
2048    /// * If this task is finished, meaning the top-level thread exited and
2049    ///   additionally it's been returned or cancelled, then this will handle
2050    ///   management of the store's "active interesting tasks" counter.
2051    ///
2052    /// Effectively this is intended to be a "narrow waist" through which many
2053    /// destruction operations are funneled through.
2054    fn cleanup_thread(
2055        &mut self,
2056        guest_thread: QualifiedThreadId,
2057        runtime_instance: RuntimeInstance,
2058        cleanup_task: CleanupTask,
2059    ) -> Result<()> {
2060        let state = self.concurrent_state_mut();
2061        let thread_data = state.get_mut(guest_thread.thread)?;
2062        let sync_call_set = thread_data.sync_call_set;
2063        if let Some(guest_id) = thread_data.instance_rep {
2064            self.instance_state(runtime_instance)
2065                .thread_handle_table()
2066                .guest_thread_remove(guest_id)?;
2067        }
2068        let state = self.concurrent_state_mut();
2069
2070        // Clean up any pending subtasks in the sync_call_set
2071        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2072            if let Some(Event::Subtask {
2073                status: Status::Returned | Status::ReturnCancelled,
2074            }) = waitable.common(state)?.event
2075            {
2076                waitable.delete_from(state)?;
2077            }
2078        }
2079
2080        state.delete(guest_thread.thread)?;
2081        state.delete(sync_call_set)?;
2082        let task = state.get_mut(guest_thread.task)?;
2083        task.threads.remove(&guest_thread.thread);
2084
2085        if task.threads.is_empty() && !task.returned_or_cancelled() {
2086            bail!(Trap::NoAsyncResult);
2087        }
2088        let ready_to_delete = task.ready_to_delete();
2089
2090        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2091            task.decremented_interesting_task_count = true;
2092
2093            debug_assert!(state.interesting_tasks > 0);
2094            state.interesting_tasks -= 1;
2095            if state.interesting_tasks == 0
2096                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2097            {
2098                waker.wake();
2099            }
2100        }
2101
2102        match cleanup_task {
2103            CleanupTask::Yes => {
2104                if ready_to_delete {
2105                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2106                }
2107            }
2108            CleanupTask::No => {}
2109        }
2110
2111        Ok(())
2112    }
2113
2114    /// Performs cancellation of the `guest_task` specified with the
2115    /// precondition that the task hasn't lowered its parameters.
2116    ///
2117    /// In this situation the task hasn't ever been started meaning it hasn't
2118    /// actually run any wasm code yet. This requires cleaning up metadata such
2119    /// as thread information attached to the task.
2120    ///
2121    /// The main two entrypoints for this function are:
2122    ///
2123    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2124    /// * Dropping a host `call_async` future which needs to cancel the task
2125    ///   because it cannot reference its parameters any more.
2126    fn cancel_guest_subtask_without_lowered_parameters(
2127        &mut self,
2128        caller_instance: RuntimeInstance,
2129        guest_task: TableId<GuestTask>,
2130    ) -> Result<()> {
2131        let concurrent_state = self.concurrent_state_mut();
2132        let task = concurrent_state.get_mut(guest_task)?;
2133        assert!(!task.already_lowered_parameters());
2134        // The task is in a `starting` state, meaning it hasn't run at
2135        // all yet.  Here we update its fields to indicate that it is
2136        // ready to delete immediately once `subtask.drop` is called.
2137        task.lower_params = None;
2138        task.lift_result = None;
2139        task.exited = true;
2140        let instance = task.instance;
2141
2142        // Clean up the thread within this task as it's now never going
2143        // to run.
2144        assert_eq!(1, task.threads.len());
2145        let thread = *task.threads.iter().next().unwrap();
2146        self.cleanup_thread(
2147            QualifiedThreadId {
2148                task: guest_task,
2149                thread,
2150            },
2151            caller_instance,
2152            CleanupTask::No,
2153        )?;
2154
2155        // Not yet started; cancel and remove from pending
2156        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2157        let pending_count = pending.len();
2158        pending.retain(|thread, _| thread.task != guest_task);
2159        // If there were no pending threads for this task, we're in an error state
2160        if pending.len() == pending_count {
2161            bail!(Trap::SubtaskCancelAfterTerminal);
2162        }
2163        Ok(())
2164    }
2165}
2166
2167enum CleanupTask {
2168    Yes,
2169    No,
2170}
2171
2172impl Instance {
2173    /// Get the next pending event for the specified task and (optional)
2174    /// waitable set, along with the waitable handle if applicable.
2175    fn get_event(
2176        self,
2177        store: &mut StoreOpaque,
2178        guest_task: TableId<GuestTask>,
2179        set: Option<TableId<WaitableSet>>,
2180        cancellable: bool,
2181    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2182        let state = store.concurrent_state_mut();
2183
2184        let event = &mut state.get_mut(guest_task)?.event;
2185        if let Some(ev) = event
2186            && (cancellable || !matches!(ev, Event::Cancelled))
2187        {
2188            log::trace!("deliver event {ev:?} to {guest_task:?}");
2189            let ev = *ev;
2190            *event = None;
2191            return Ok(Some((ev, None)));
2192        }
2193
2194        let set = match set {
2195            Some(set) => set,
2196            None => return Ok(None),
2197        };
2198        let waitable = match state.get_mut(set)?.ready.pop_first() {
2199            Some(v) => v,
2200            None => return Ok(None),
2201        };
2202
2203        let common = waitable.common(state)?;
2204        let handle = match common.handle {
2205            Some(h) => h,
2206            None => bail_bug!("handle not set when delivering event"),
2207        };
2208        let event = match common.event.take() {
2209            Some(e) => e,
2210            None => bail_bug!("event not set when delivering event"),
2211        };
2212
2213        log::trace!(
2214            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2215        );
2216
2217        waitable.on_delivery(store, self, event)?;
2218
2219        Ok(Some((event, Some((waitable, handle)))))
2220    }
2221
2222    /// Handle the `CallbackCode` returned from an async-lifted export or its
2223    /// callback.
2224    ///
2225    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2226    /// using `handle_guest_call`.
2227    fn handle_callback_code(
2228        self,
2229        store: &mut StoreOpaque,
2230        guest_thread: QualifiedThreadId,
2231        runtime_instance: RuntimeComponentInstanceIndex,
2232        code: u32,
2233    ) -> Result<Option<GuestCall>> {
2234        let (code, set) = unpack_callback_code(code);
2235
2236        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2237
2238        let state = store.concurrent_state_mut();
2239
2240        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2241            let set = store
2242                .instance_state(self.runtime_instance(runtime_instance))
2243                .handle_table()
2244                .waitable_set_rep(handle)?;
2245
2246            Ok(TableId::<WaitableSet>::new(set))
2247        };
2248
2249        Ok(match code {
2250            callback_code::EXIT => {
2251                log::trace!("implicit thread {guest_thread:?} completed");
2252                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2253                task.exited = true;
2254                task.callback = None;
2255                store.cleanup_thread(
2256                    guest_thread,
2257                    self.runtime_instance(runtime_instance),
2258                    CleanupTask::Yes,
2259                )?;
2260                None
2261            }
2262            callback_code::YIELD => {
2263                let task = state.get_mut(guest_thread.task)?;
2264                // If an `Event::Cancelled` is pending, we'll deliver that;
2265                // otherwise, we'll deliver `Event::None`.  Note that
2266                // `GuestTask::event` is only ever set to one of those two
2267                // `Event` variants.
2268                if let Some(event) = task.event {
2269                    assert!(matches!(event, Event::None | Event::Cancelled));
2270                } else {
2271                    task.event = Some(Event::None);
2272                }
2273                let call = GuestCall {
2274                    thread: guest_thread,
2275                    kind: GuestCallKind::DeliverEvent {
2276                        instance: self,
2277                        set: None,
2278                    },
2279                };
2280                if state.may_block(guest_thread.task)? {
2281                    // Push this thread onto the "low priority" queue so it runs
2282                    // after any other threads have had a chance to run.
2283                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2284                    None
2285                } else {
2286                    // Yielding in a non-blocking context is defined as a no-op
2287                    // according to the spec, so we must run this thread
2288                    // immediately without allowing any others to run.
2289                    Some(call)
2290                }
2291            }
2292            callback_code::WAIT => {
2293                // The task may only return `WAIT` if it was created for a call
2294                // to an async export).  Otherwise, we'll trap.
2295                state.check_blocking_for(guest_thread.task)?;
2296
2297                let set = get_set(store, set)?;
2298                let state = store.concurrent_state_mut();
2299
2300                if state.get_mut(guest_thread.task)?.event.is_some()
2301                    || !state.get_mut(set)?.ready.is_empty()
2302                {
2303                    // An event is immediately available; deliver it ASAP.
2304                    state.push_high_priority(WorkItem::GuestCall(
2305                        runtime_instance,
2306                        GuestCall {
2307                            thread: guest_thread,
2308                            kind: GuestCallKind::DeliverEvent {
2309                                instance: self,
2310                                set: Some(set),
2311                            },
2312                        },
2313                    ));
2314                } else {
2315                    // No event is immediately available.
2316                    //
2317                    // We're waiting, so register to be woken up when an event
2318                    // is published for this waitable set.
2319                    //
2320                    // Here we also set `GuestTask::wake_on_cancel` which allows
2321                    // `subtask.cancel` to interrupt the wait.
2322                    let old = state
2323                        .get_mut(guest_thread.thread)?
2324                        .wake_on_cancel
2325                        .replace(set);
2326                    if !old.is_none() {
2327                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2328                    }
2329                    let old = state
2330                        .get_mut(set)?
2331                        .waiting
2332                        .insert(guest_thread, WaitMode::Callback(self));
2333                    if !old.is_none() {
2334                        bail_bug!("set's waiting set already had this thread registered");
2335                    }
2336                }
2337                None
2338            }
2339            _ => bail!(Trap::UnsupportedCallbackCode),
2340        })
2341    }
2342
2343    /// Add the specified guest call to the "high priority" work item queue, to
2344    /// be started as soon as backpressure and/or reentrance rules allow.
2345    ///
2346    /// SAFETY: The raw pointer arguments must be valid references to guest
2347    /// functions (with the appropriate signatures) when the closures queued by
2348    /// this function are called.
2349    unsafe fn queue_call<T: 'static>(
2350        self,
2351        mut store: StoreContextMut<T>,
2352        guest_thread: QualifiedThreadId,
2353        callee: SendSyncPtr<VMFuncRef>,
2354        param_count: usize,
2355        result_count: usize,
2356        async_: bool,
2357        callback: Option<SendSyncPtr<VMFuncRef>>,
2358        post_return: Option<SendSyncPtr<VMFuncRef>>,
2359    ) -> Result<()> {
2360        /// Return a closure which will call the specified function in the scope
2361        /// of the specified task.
2362        ///
2363        /// This will use `GuestTask::lower_params` to lower the parameters, but
2364        /// will not lift the result; instead, it returns a
2365        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2366        /// any, may be lifted.  Note that an async-lifted export will have
2367        /// returned its result using the `task.return` intrinsic (or not
2368        /// returned a result at all, in the case of `task.cancel`), in which
2369        /// case the "result" of this call will either be a callback code or
2370        /// nothing.
2371        ///
2372        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2373        /// the returned closure is called.
2374        unsafe fn make_call<T: 'static>(
2375            store: StoreContextMut<T>,
2376            guest_thread: QualifiedThreadId,
2377            callee: SendSyncPtr<VMFuncRef>,
2378            param_count: usize,
2379            result_count: usize,
2380        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2381        + Send
2382        + Sync
2383        + 'static
2384        + use<T> {
2385            let token = StoreToken::new(store);
2386            move |store: &mut dyn VMStore| {
2387                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2388
2389                store
2390                    .concurrent_state_mut()
2391                    .get_mut(guest_thread.thread)?
2392                    .state = GuestThreadState::Running;
2393                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2394                let lower = match task.lower_params.take() {
2395                    Some(l) => l,
2396                    None => bail_bug!("lower_params missing"),
2397                };
2398
2399                lower(store, &mut storage[..param_count])?;
2400
2401                let mut store = token.as_context_mut(store);
2402
2403                // SAFETY: Per the contract documented in `make_call's`
2404                // documentation, `callee` must be a valid pointer.
2405                unsafe {
2406                    crate::Func::call_unchecked_raw(
2407                        &mut store,
2408                        callee.as_non_null(),
2409                        NonNull::new(
2410                            &mut storage[..param_count.max(result_count)]
2411                                as *mut [MaybeUninit<ValRaw>] as _,
2412                        )
2413                        .unwrap(),
2414                    )?;
2415                }
2416
2417                Ok(storage)
2418            }
2419        }
2420
2421        // SAFETY: Per the contract described in this function documentation,
2422        // the `callee` pointer which `call` closes over must be valid when
2423        // called by the closure we queue below.
2424        let call = unsafe {
2425            make_call(
2426                store.as_context_mut(),
2427                guest_thread,
2428                callee,
2429                param_count,
2430                result_count,
2431            )
2432        };
2433
2434        let callee_instance = store
2435            .0
2436            .concurrent_state_mut()
2437            .get_mut(guest_thread.task)?
2438            .instance;
2439
2440        let fun = if callback.is_some() {
2441            assert!(async_);
2442
2443            Box::new(move |store: &mut dyn VMStore| {
2444                self.add_guest_thread_to_instance_table(
2445                    guest_thread.thread,
2446                    store,
2447                    callee_instance.index,
2448                )?;
2449                let old_thread = store.set_thread(guest_thread)?;
2450                log::trace!(
2451                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2452                );
2453
2454                store.enter_instance(callee_instance);
2455
2456                // SAFETY: See the documentation for `make_call` to review the
2457                // contract we must uphold for `call` here.
2458                //
2459                // Per the contract described in the `queue_call`
2460                // documentation, the `callee` pointer which `call` closes
2461                // over must be valid.
2462                let storage = call(store)?;
2463
2464                store.exit_instance(callee_instance)?;
2465
2466                store.set_thread(old_thread)?;
2467                let state = store.concurrent_state_mut();
2468                if let Some(t) = old_thread.guest() {
2469                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2470                }
2471                log::trace!("stackless call: restored {old_thread:?} as current thread");
2472
2473                // SAFETY: `wasmparser` will have validated that the callback
2474                // function returns a `i32` result.
2475                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2476
2477                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2478            })
2479                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2480        } else {
2481            let token = StoreToken::new(store.as_context_mut());
2482            Box::new(move |store: &mut dyn VMStore| {
2483                self.add_guest_thread_to_instance_table(
2484                    guest_thread.thread,
2485                    store,
2486                    callee_instance.index,
2487                )?;
2488                let old_thread = store.set_thread(guest_thread)?;
2489                log::trace!(
2490                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2491                );
2492                let flags = self.id().get(store).instance_flags(callee_instance.index);
2493
2494                // Unless this is a callback-less (i.e. stackful)
2495                // async-lifted export, we need to record that the instance
2496                // cannot be entered until the call returns.
2497                if !async_ {
2498                    store.enter_instance(callee_instance);
2499                }
2500
2501                // SAFETY: See the documentation for `make_call` to review the
2502                // contract we must uphold for `call` here.
2503                //
2504                // Per the contract described in the `queue_call`
2505                // documentation, the `callee` pointer which `call` closes
2506                // over must be valid.
2507                let storage = call(store)?;
2508
2509                if !async_ {
2510                    // This is a sync-lifted export, so now is when we lift the
2511                    // result, optionally call the post-return function, if any,
2512                    // and finally notify any current or future waiters that the
2513                    // subtask has returned.
2514
2515                    let lift = {
2516                        store.exit_instance(callee_instance)?;
2517
2518                        let state = store.concurrent_state_mut();
2519                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2520                            bail_bug!("task has already produced a result");
2521                        }
2522
2523                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2524                            Some(lift) => lift,
2525                            None => bail_bug!("lift_result field is missing"),
2526                        }
2527                    };
2528
2529                    // SAFETY: `result_count` represents the number of core Wasm
2530                    // results returned, per `wasmparser`.
2531                    let result = (lift.lift)(store, unsafe {
2532                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2533                            &storage[..result_count],
2534                        )
2535                    })?;
2536
2537                    let post_return_arg = match result_count {
2538                        0 => ValRaw::i32(0),
2539                        // SAFETY: `result_count` represents the number of
2540                        // core Wasm results returned, per `wasmparser`.
2541                        1 => unsafe { storage[0].assume_init() },
2542                        _ => unreachable!(),
2543                    };
2544
2545                    unsafe {
2546                        call_post_return(
2547                            token.as_context_mut(store),
2548                            post_return.map(|v| v.as_non_null()),
2549                            post_return_arg,
2550                            flags,
2551                        )?;
2552                    }
2553
2554                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2555                }
2556
2557                store.set_thread(old_thread)?;
2558
2559                store
2560                    .concurrent_state_mut()
2561                    .get_mut(guest_thread.task)?
2562                    .exited = true;
2563
2564                // This is a callback-less call, so the implicit thread has now completed
2565                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2566                Ok(None)
2567            })
2568        };
2569
2570        store
2571            .0
2572            .concurrent_state_mut()
2573            .push_high_priority(WorkItem::GuestCall(
2574                callee_instance.index,
2575                GuestCall {
2576                    thread: guest_thread,
2577                    kind: GuestCallKind::StartImplicit(fun),
2578                },
2579            ));
2580
2581        Ok(())
2582    }
2583
2584    /// Prepare (but do not start) a guest->guest call.
2585    ///
2586    /// This is called from fused adapter code generated in
2587    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2588    /// are synthesized Wasm functions which move the parameters from the caller
2589    /// to the callee and the result from the callee to the caller,
2590    /// respectively.  The adapter will call `Self::start_call` immediately
2591    /// after calling this function.
2592    ///
2593    /// SAFETY: All the pointer arguments must be valid pointers to guest
2594    /// entities (and with the expected signatures for the function references
2595    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2596    unsafe fn prepare_call<T: 'static>(
2597        self,
2598        mut store: StoreContextMut<T>,
2599        start: NonNull<VMFuncRef>,
2600        return_: NonNull<VMFuncRef>,
2601        caller_instance: RuntimeComponentInstanceIndex,
2602        callee_instance: RuntimeComponentInstanceIndex,
2603        task_return_type: TypeTupleIndex,
2604        callee_async: bool,
2605        memory: *mut VMMemoryDefinition,
2606        string_encoding: StringEncoding,
2607        caller_info: CallerInfo,
2608    ) -> Result<()> {
2609        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2610            // A task may only call an async-typed function via a sync lower if
2611            // it was created by a call to an async export.  Otherwise, we'll
2612            // trap.
2613            store.0.check_blocking()?;
2614        }
2615
2616        enum ResultInfo {
2617            Heap { results: u32 },
2618            Stack { result_count: u32 },
2619        }
2620
2621        let result_info = match &caller_info {
2622            CallerInfo::Async {
2623                has_result: true,
2624                params,
2625            } => ResultInfo::Heap {
2626                results: match params.last() {
2627                    Some(r) => r.get_u32(),
2628                    None => bail_bug!("retptr missing"),
2629                },
2630            },
2631            CallerInfo::Async {
2632                has_result: false, ..
2633            } => ResultInfo::Stack { result_count: 0 },
2634            CallerInfo::Sync {
2635                result_count,
2636                params,
2637            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2638                results: match params.last() {
2639                    Some(r) => r.get_u32(),
2640                    None => bail_bug!("arg ptr missing"),
2641                },
2642            },
2643            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2644                result_count: *result_count,
2645            },
2646        };
2647
2648        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2649
2650        // Create a new guest task for the call, closing over the `start` and
2651        // `return_` functions to lift the parameters and lower the result,
2652        // respectively.
2653        let start = SendSyncPtr::new(start);
2654        let return_ = SendSyncPtr::new(return_);
2655        let token = StoreToken::new(store.as_context_mut());
2656        let state = store.0.concurrent_state_mut();
2657        let old_thread = state.current_guest_thread()?;
2658
2659        debug_assert_eq!(
2660            state.get_mut(old_thread.task)?.instance,
2661            self.runtime_instance(caller_instance)
2662        );
2663
2664        let guest_thread = GuestTask::new(
2665            state,
2666            Box::new(move |store, dst| {
2667                let mut store = token.as_context_mut(store);
2668                assert!(dst.len() <= MAX_FLAT_PARAMS);
2669                // The `+ 1` here accounts for the return pointer, if any:
2670                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2671                let count = match caller_info {
2672                    // Async callers, if they have a result, use the last
2673                    // parameter as a return pointer so chop that off if
2674                    // relevant here.
2675                    CallerInfo::Async { params, has_result } => {
2676                        let params = &params[..params.len() - usize::from(has_result)];
2677                        for (param, src) in params.iter().zip(&mut src) {
2678                            src.write(*param);
2679                        }
2680                        params.len()
2681                    }
2682
2683                    // Sync callers forward everything directly.
2684                    CallerInfo::Sync { params, .. } => {
2685                        for (param, src) in params.iter().zip(&mut src) {
2686                            src.write(*param);
2687                        }
2688                        params.len()
2689                    }
2690                };
2691                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2692                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2693                // how it was constructed (see
2694                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2695                // for details) we know it takes count parameters and returns
2696                // `dst.len()` results.
2697                unsafe {
2698                    crate::Func::call_unchecked_raw(
2699                        &mut store,
2700                        start.as_non_null(),
2701                        NonNull::new(
2702                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2703                        )
2704                        .unwrap(),
2705                    )?;
2706                }
2707                dst.copy_from_slice(&src[..dst.len()]);
2708                let state = store.0.concurrent_state_mut();
2709                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2710                    state,
2711                    Some(Event::Subtask {
2712                        status: Status::Started,
2713                    }),
2714                )?;
2715                Ok(())
2716            }),
2717            LiftResult {
2718                lift: Box::new(move |store, src| {
2719                    // SAFETY: See comment in closure passed as `lower_params`
2720                    // parameter above.
2721                    let mut store = token.as_context_mut(store);
2722                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2723                    if let ResultInfo::Heap { results } = &result_info {
2724                        my_src.push(ValRaw::u32(*results));
2725                    }
2726
2727                    // Execute the `return_` hook, generated by Wasmtime's FACT
2728                    // compiler, in the context of the old thread. The old
2729                    // thread, this thread's caller, may have `realloc`
2730                    // callbacks invoked for example and those need the correct
2731                    // context set for the current thread.
2732                    let prev = store.0.set_thread(old_thread)?;
2733
2734                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2735                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2736                    // on how it was constructed (see
2737                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2738                    // for details) we know it takes `src.len()` parameters and
2739                    // returns up to 1 result.
2740                    unsafe {
2741                        crate::Func::call_unchecked_raw(
2742                            &mut store,
2743                            return_.as_non_null(),
2744                            my_src.as_mut_slice().into(),
2745                        )?;
2746                    }
2747
2748                    // Restore the previous current thread after the
2749                    // lifting/lowering has returned.
2750                    store.0.set_thread(prev)?;
2751
2752                    let state = store.0.concurrent_state_mut();
2753                    let thread = state.current_guest_thread()?;
2754                    if sync_caller {
2755                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2756                            if let ResultInfo::Stack { result_count } = &result_info {
2757                                match result_count {
2758                                    0 => None,
2759                                    1 => Some(my_src[0]),
2760                                    _ => unreachable!(),
2761                                }
2762                            } else {
2763                                None
2764                            },
2765                        );
2766                    }
2767                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2768                }),
2769                ty: task_return_type,
2770                memory: NonNull::new(memory).map(SendSyncPtr::new),
2771                string_encoding,
2772            },
2773            Caller::Guest { thread: old_thread },
2774            None,
2775            self.runtime_instance(callee_instance),
2776            callee_async,
2777        )?;
2778
2779        // Make the new thread the current one so that `Self::start_call` knows
2780        // which one to start.
2781        store.0.set_thread(guest_thread)?;
2782        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2783
2784        Ok(())
2785    }
2786
2787    /// Call the specified callback function for an async-lifted export.
2788    ///
2789    /// SAFETY: `function` must be a valid reference to a guest function of the
2790    /// correct signature for a callback.
2791    unsafe fn call_callback<T>(
2792        self,
2793        mut store: StoreContextMut<T>,
2794        function: SendSyncPtr<VMFuncRef>,
2795        event: Event,
2796        handle: u32,
2797    ) -> Result<u32> {
2798        let (ordinal, result) = event.parts();
2799        let params = &mut [
2800            ValRaw::u32(ordinal),
2801            ValRaw::u32(handle),
2802            ValRaw::u32(result),
2803        ];
2804        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2805        // `wasmtime-cranelift`-generated fused adapter code or
2806        // `component::Options`.  Per `wasmparser` callback signature
2807        // validation, we know it takes three parameters and returns one.
2808        unsafe {
2809            crate::Func::call_unchecked_raw(
2810                &mut store,
2811                function.as_non_null(),
2812                params.as_mut_slice().into(),
2813            )?;
2814        }
2815        Ok(params[0].get_u32())
2816    }
2817
2818    /// Start a guest->guest call previously prepared using
2819    /// `Self::prepare_call`.
2820    ///
2821    /// This is called from fused adapter code generated in
2822    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2823    /// this function immediately after calling `Self::prepare_call`.
2824    ///
2825    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2826    /// functions with the appropriate signatures for the current guest task.
2827    /// If this is a call to an async-lowered import, the actual call may be
2828    /// deferred and run after this function returns, in which case the pointer
2829    /// arguments must also be valid when the call happens.
2830    unsafe fn start_call<T: 'static>(
2831        self,
2832        mut store: StoreContextMut<T>,
2833        callback: *mut VMFuncRef,
2834        post_return: *mut VMFuncRef,
2835        callee: NonNull<VMFuncRef>,
2836        param_count: u32,
2837        result_count: u32,
2838        flags: u32,
2839        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2840    ) -> Result<u32> {
2841        let token = StoreToken::new(store.as_context_mut());
2842        let async_caller = storage.is_none();
2843        let state = store.0.concurrent_state_mut();
2844        let guest_thread = state.current_guest_thread()?;
2845        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2846        let callee = SendSyncPtr::new(callee);
2847        let param_count = usize::try_from(param_count)?;
2848        assert!(param_count <= MAX_FLAT_PARAMS);
2849        let result_count = usize::try_from(result_count)?;
2850        assert!(result_count <= MAX_FLAT_RESULTS);
2851
2852        let task = state.get_mut(guest_thread.task)?;
2853        if let Some(callback) = NonNull::new(callback) {
2854            // We're calling an async-lifted export with a callback, so store
2855            // the callback and related context as part of the task so we can
2856            // call it later when needed.
2857            let callback = SendSyncPtr::new(callback);
2858            task.callback = Some(Box::new(move |store, event, handle| {
2859                let store = token.as_context_mut(store);
2860                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2861            }));
2862        }
2863
2864        let Caller::Guest { thread: caller } = &task.caller else {
2865            // As of this writing, `start_call` is only used for guest->guest
2866            // calls.
2867            bail_bug!("start_call unexpectedly invoked for host->guest call");
2868        };
2869        let caller = *caller;
2870        let caller_instance = state.get_mut(caller.task)?.instance;
2871
2872        // Queue the call as a "high priority" work item.
2873        unsafe {
2874            self.queue_call(
2875                store.as_context_mut(),
2876                guest_thread,
2877                callee,
2878                param_count,
2879                result_count,
2880                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2881                NonNull::new(callback).map(SendSyncPtr::new),
2882                NonNull::new(post_return).map(SendSyncPtr::new),
2883            )?;
2884        }
2885
2886        let state = store.0.concurrent_state_mut();
2887
2888        // Use the caller's `GuestThread::sync_call_set` to register interest in
2889        // the subtask...
2890        let guest_waitable = Waitable::Guest(guest_thread.task);
2891        let old_set = guest_waitable.common(state)?.set;
2892        let set = state.get_mut(caller.thread)?.sync_call_set;
2893        guest_waitable.join(state, Some(set))?;
2894
2895        store.0.set_thread(CurrentThread::None)?;
2896
2897        // ... and suspend this fiber temporarily while we wait for it to start.
2898        //
2899        // Note that we _could_ call the callee directly using the current fiber
2900        // rather than suspend this one, but that would make reasoning about the
2901        // event loop more complicated and is probably only worth doing if
2902        // there's a measurable performance benefit.  In addition, it would mean
2903        // blocking the caller if the callee calls a blocking sync-lowered
2904        // import, and as of this writing the spec says we must not do that.
2905        //
2906        // Alternatively, the fused adapter code could be modified to call the
2907        // callee directly without calling a host-provided intrinsic at all (in
2908        // which case it would need to do its own, inline backpressure checks,
2909        // etc.).  Again, we'd want to see a measurable performance benefit
2910        // before committing to such an optimization.  And again, we'd need to
2911        // update the spec to allow that.
2912        let (status, waitable) = loop {
2913            store.0.suspend(SuspendReason::Waiting {
2914                set,
2915                thread: caller,
2916                // Normally, `StoreOpaque::suspend` would assert it's being
2917                // called from a context where blocking is allowed.  However, if
2918                // `async_caller` is `true`, we'll only "block" long enough for
2919                // the callee to start, i.e. we won't repeat this loop, so we
2920                // tell `suspend` it's okay even if we're not allowed to block.
2921                // Alternatively, if the callee is not an async function, then
2922                // we know it won't block anyway.
2923                skip_may_block_check: async_caller || !callee_async,
2924            })?;
2925
2926            let state = store.0.concurrent_state_mut();
2927
2928            log::trace!("taking event for {:?}", guest_thread.task);
2929            let event = guest_waitable.take_event(state)?;
2930            let Some(Event::Subtask { status }) = event else {
2931                bail_bug!("subtasks should only get subtask events, got {event:?}")
2932            };
2933
2934            log::trace!("status {status:?} for {:?}", guest_thread.task);
2935
2936            if status == Status::Returned {
2937                // It returned, so we can stop waiting.
2938                break (status, None);
2939            } else if async_caller {
2940                // It hasn't returned yet, but the caller is calling via an
2941                // async-lowered import, so we generate a handle for the task
2942                // waitable and return the status.
2943                let handle = store
2944                    .0
2945                    .instance_state(caller_instance)
2946                    .handle_table()
2947                    .subtask_insert_guest(guest_thread.task.rep())?;
2948                store
2949                    .0
2950                    .concurrent_state_mut()
2951                    .get_mut(guest_thread.task)?
2952                    .common
2953                    .handle = Some(handle);
2954                break (status, Some(handle));
2955            } else {
2956                // The callee hasn't returned yet, and the caller is calling via
2957                // a sync-lowered import, so we loop and keep waiting until the
2958                // callee returns.
2959            }
2960        };
2961
2962        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2963
2964        // Reset the current thread to point to the caller as it resumes control.
2965        store.0.set_thread(caller)?;
2966        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2967        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2968
2969        if let Some(storage) = storage {
2970            // The caller used a sync-lowered import to call an async-lifted
2971            // export, in which case the result, if any, has been stashed in
2972            // `GuestTask::sync_result`.
2973            let state = store.0.concurrent_state_mut();
2974            let task = state.get_mut(guest_thread.task)?;
2975            if let Some(result) = task.sync_result.take()? {
2976                if let Some(result) = result {
2977                    storage[0] = MaybeUninit::new(result);
2978                }
2979
2980                if task.exited && task.ready_to_delete() {
2981                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2982                }
2983            }
2984        }
2985
2986        Ok(status.pack(waitable))
2987    }
2988
2989    /// Poll the specified future once on behalf of a guest->host call using an
2990    /// async-lowered import.
2991    ///
2992    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2993    /// `Pending`, add it to the set of futures to be polled as part of this
2994    /// instance's event loop until it completes, and then return
2995    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2996    ///
2997    /// Whether the future returns `Ready` immediately or later, the `lower`
2998    /// function will be used to lower the result, if any, into the guest caller's
2999    /// stack and linear memory. The `lower` function is invoked with `None` if
3000    /// the future is cancelled.
3001    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3002        self,
3003        mut store: StoreContextMut<'_, T>,
3004        future: impl Future<Output = Result<R>> + Send + 'static,
3005        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3006    ) -> Result<Option<u32>> {
3007        let token = StoreToken::new(store.as_context_mut());
3008        let state = store.0.concurrent_state_mut();
3009        let task = state.current_host_thread()?;
3010
3011        // Create an abortable future which hooks calls to poll and manages call
3012        // context state for the future.
3013        let (join_handle, future) = JoinHandle::run(future);
3014        {
3015            let state = &mut state.get_mut(task)?.state;
3016            assert!(matches!(state, HostTaskState::CalleeStarted));
3017            *state = HostTaskState::CalleeRunning(join_handle);
3018        }
3019
3020        let mut future = Box::pin(future);
3021
3022        // Finally, poll the future.  We can use a dummy `Waker` here because
3023        // we'll add the future to `ConcurrentState::futures` and poll it
3024        // automatically from the event loop if it doesn't complete immediately
3025        // here.
3026        let poll = tls::set(store.0, || {
3027            future
3028                .as_mut()
3029                .poll(&mut Context::from_waker(&Waker::noop()))
3030        });
3031
3032        match poll {
3033            // It finished immediately; lower the result and delete the task.
3034            Poll::Ready(result) => {
3035                let result = result.transpose()?;
3036                lower(store.as_context_mut(), result)?;
3037                return Ok(None);
3038            }
3039
3040            // Future isn't ready yet, so fall through.
3041            Poll::Pending => {}
3042        }
3043
3044        // It hasn't finished yet; add the future to
3045        // `ConcurrentState::futures` so it will be polled by the event
3046        // loop and allocate a waitable handle to return to the guest.
3047
3048        // Wrap the future in a closure responsible for lowering the result into
3049        // the guest's stack and memory, as well as notifying any waiters that
3050        // the task returned.
3051        let future = Box::pin(async move {
3052            let result = match future.await {
3053                Some(result) => Some(result?),
3054                None => None,
3055            };
3056            let on_complete = move |store: &mut dyn VMStore| {
3057                // Restore the `current_thread` to be the host so `lower` knows
3058                // how to manipulate borrows and knows which scope of borrows
3059                // to check.
3060                let mut store = token.as_context_mut(store);
3061                let old = store.0.set_thread(task)?;
3062
3063                let status = if result.is_some() {
3064                    Status::Returned
3065                } else {
3066                    Status::ReturnCancelled
3067                };
3068
3069                lower(store.as_context_mut(), result)?;
3070                let state = store.0.concurrent_state_mut();
3071                match &mut state.get_mut(task)?.state {
3072                    // The task is already flagged as finished because it was
3073                    // cancelled. No need to transition further.
3074                    HostTaskState::CalleeDone { .. } => {}
3075
3076                    // Otherwise transition this task to the done state.
3077                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3078                }
3079                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3080
3081                store.0.set_thread(old)?;
3082                Ok(())
3083            };
3084
3085            // Here we schedule a task to run on a worker fiber to do the
3086            // lowering since it may involve a call to the guest's realloc
3087            // function. This is necessary because calling the guest while
3088            // there are host embedder frames on the stack is unsound.
3089            tls::get(move |store| {
3090                store
3091                    .concurrent_state_mut()
3092                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3093                        on_complete,
3094                    ))));
3095                Ok(())
3096            })
3097        });
3098
3099        // Make this task visible to the guest and then record what it
3100        // was made visible as.
3101        let state = store.0.concurrent_state_mut();
3102        state.push_future(future);
3103        let caller = state.get_mut(task)?.caller;
3104        let instance = state.get_mut(caller.task)?.instance;
3105        let handle = store
3106            .0
3107            .instance_state(instance)
3108            .handle_table()
3109            .subtask_insert_host(task.rep())?;
3110        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3111        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3112
3113        // Restore the currently running thread to this host task's
3114        // caller. Note that the host task isn't deallocated as it's
3115        // within the store and will get deallocated later.
3116        store.0.set_thread(caller)?;
3117        Ok(Some(handle))
3118    }
3119
3120    /// Implements the `task.return` intrinsic, lifting the result for the
3121    /// current guest task.
3122    pub(crate) fn task_return(
3123        self,
3124        store: &mut dyn VMStore,
3125        ty: TypeTupleIndex,
3126        options: OptionsIndex,
3127        storage: &[ValRaw],
3128    ) -> Result<()> {
3129        let state = store.concurrent_state_mut();
3130        let guest_thread = state.current_guest_thread()?;
3131        let lift = state
3132            .get_mut(guest_thread.task)?
3133            .lift_result
3134            .take()
3135            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3136        if !state.get_mut(guest_thread.task)?.result.is_none() {
3137            bail_bug!("task result unexpectedly already set");
3138        }
3139
3140        let CanonicalOptions {
3141            string_encoding,
3142            data_model,
3143            ..
3144        } = &self.id().get(store).component().env_component().options[options];
3145
3146        let invalid = ty != lift.ty
3147            || string_encoding != &lift.string_encoding
3148            || match data_model {
3149                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3150                    Some(memory) => {
3151                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3152                        let actual = self.id().get(store).runtime_memory(memory);
3153                        expected != actual.as_ptr()
3154                    }
3155                    // Memory not specified, meaning it didn't need to be
3156                    // specified per validation, so not invalid.
3157                    None => false,
3158                },
3159                // Always invalid as this isn't supported.
3160                CanonicalOptionsDataModel::Gc { .. } => true,
3161            };
3162
3163        if invalid {
3164            bail!(Trap::TaskReturnInvalid);
3165        }
3166
3167        log::trace!("task.return for {guest_thread:?}");
3168
3169        let result = (lift.lift)(store, storage)?;
3170        self.task_complete(store, guest_thread.task, result, Status::Returned)
3171    }
3172
3173    /// Implements the `task.cancel` intrinsic.
3174    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3175        let state = store.concurrent_state_mut();
3176        let guest_thread = state.current_guest_thread()?;
3177        let task = state.get_mut(guest_thread.task)?;
3178        if !task.cancel_sent {
3179            bail!(Trap::TaskCancelNotCancelled);
3180        }
3181        _ = task
3182            .lift_result
3183            .take()
3184            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3185
3186        if !task.result.is_none() {
3187            bail_bug!("task result should not bet set yet");
3188        }
3189
3190        log::trace!("task.cancel for {guest_thread:?}");
3191
3192        self.task_complete(
3193            store,
3194            guest_thread.task,
3195            Box::new(DummyResult),
3196            Status::ReturnCancelled,
3197        )
3198    }
3199
3200    /// Complete the specified guest task (i.e. indicate that it has either
3201    /// returned a (possibly empty) result or cancelled itself).
3202    ///
3203    /// This will return any resource borrows and notify any current or future
3204    /// waiters that the task has completed.
3205    fn task_complete(
3206        self,
3207        store: &mut StoreOpaque,
3208        guest_task: TableId<GuestTask>,
3209        result: Box<dyn Any + Send + Sync>,
3210        status: Status,
3211    ) -> Result<()> {
3212        store
3213            .component_resource_tables(Some(self))
3214            .validate_scope_exit()?;
3215
3216        let state = store.concurrent_state_mut();
3217        let task = state.get_mut(guest_task)?;
3218
3219        if let Caller::Host { tx, .. } = &mut task.caller {
3220            if let Some(tx) = tx.take() {
3221                _ = tx.send(result);
3222            }
3223        } else {
3224            task.result = Some(result);
3225            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3226        }
3227
3228        Ok(())
3229    }
3230
3231    /// Implements the `waitable-set.new` intrinsic.
3232    pub(crate) fn waitable_set_new(
3233        self,
3234        store: &mut StoreOpaque,
3235        caller_instance: RuntimeComponentInstanceIndex,
3236    ) -> Result<u32> {
3237        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3238        let handle = store
3239            .instance_state(self.runtime_instance(caller_instance))
3240            .handle_table()
3241            .waitable_set_insert(set.rep())?;
3242        log::trace!("new waitable set {set:?} (handle {handle})");
3243        Ok(handle)
3244    }
3245
3246    /// Implements the `waitable-set.drop` intrinsic.
3247    pub(crate) fn waitable_set_drop(
3248        self,
3249        store: &mut StoreOpaque,
3250        caller_instance: RuntimeComponentInstanceIndex,
3251        set: u32,
3252    ) -> Result<()> {
3253        let rep = store
3254            .instance_state(self.runtime_instance(caller_instance))
3255            .handle_table()
3256            .waitable_set_remove(set)?;
3257
3258        log::trace!("drop waitable set {rep} (handle {set})");
3259
3260        // Note that we're careful to check for waiters _before_ deleting the
3261        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3262        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3263        if !store
3264            .concurrent_state_mut()
3265            .get_mut(TableId::<WaitableSet>::new(rep))?
3266            .waiting
3267            .is_empty()
3268        {
3269            bail!(Trap::WaitableSetDropHasWaiters);
3270        }
3271
3272        store
3273            .concurrent_state_mut()
3274            .delete(TableId::<WaitableSet>::new(rep))?;
3275
3276        Ok(())
3277    }
3278
3279    /// Implements the `waitable.join` intrinsic.
3280    pub(crate) fn waitable_join(
3281        self,
3282        store: &mut StoreOpaque,
3283        caller_instance: RuntimeComponentInstanceIndex,
3284        waitable_handle: u32,
3285        set_handle: u32,
3286    ) -> Result<()> {
3287        let mut instance = self.id().get_mut(store);
3288        let waitable =
3289            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3290
3291        let set = if set_handle == 0 {
3292            None
3293        } else {
3294            let set = instance.instance_states().0[caller_instance]
3295                .handle_table()
3296                .waitable_set_rep(set_handle)?;
3297
3298            let state = store.concurrent_state_mut();
3299            if let Some(old) = waitable.common(state)?.set
3300                && state.get_mut(old)?.is_sync_call_set
3301            {
3302                bail!(Trap::WaitableSyncAndAsync);
3303            }
3304
3305            Some(TableId::<WaitableSet>::new(set))
3306        };
3307
3308        log::trace!(
3309            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3310        );
3311
3312        waitable.join(store.concurrent_state_mut(), set)
3313    }
3314
3315    /// Implements the `subtask.drop` intrinsic.
3316    pub(crate) fn subtask_drop(
3317        self,
3318        store: &mut StoreOpaque,
3319        caller_instance: RuntimeComponentInstanceIndex,
3320        task_id: u32,
3321    ) -> Result<()> {
3322        self.waitable_join(store, caller_instance, task_id, 0)?;
3323
3324        let (rep, is_host) = store
3325            .instance_state(self.runtime_instance(caller_instance))
3326            .handle_table()
3327            .subtask_remove(task_id)?;
3328
3329        let concurrent_state = store.concurrent_state_mut();
3330        let (waitable, delete) = if is_host {
3331            let id = TableId::<HostTask>::new(rep);
3332            let task = concurrent_state.get_mut(id)?;
3333            match &task.state {
3334                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3335                HostTaskState::CalleeDone { .. } => {}
3336                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3337                    bail_bug!("invalid state for callee in `subtask.drop`")
3338                }
3339            }
3340            (Waitable::Host(id), true)
3341        } else {
3342            let id = TableId::<GuestTask>::new(rep);
3343            let task = concurrent_state.get_mut(id)?;
3344            if task.lift_result.is_some() {
3345                bail!(Trap::SubtaskDropNotResolved);
3346            }
3347            (
3348                Waitable::Guest(id),
3349                concurrent_state.get_mut(id)?.ready_to_delete(),
3350            )
3351        };
3352
3353        waitable.common(concurrent_state)?.handle = None;
3354
3355        // If this subtask has an event that means that the terminal status of
3356        // this subtask wasn't yet received so it can't be dropped yet.
3357        if waitable.take_event(concurrent_state)?.is_some() {
3358            bail!(Trap::SubtaskDropNotResolved);
3359        }
3360
3361        if delete {
3362            waitable.delete_from(concurrent_state)?;
3363        }
3364
3365        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3366        Ok(())
3367    }
3368
3369    /// Implements the `waitable-set.wait` intrinsic.
3370    pub(crate) fn waitable_set_wait(
3371        self,
3372        store: &mut StoreOpaque,
3373        options: OptionsIndex,
3374        set: u32,
3375        payload: u32,
3376    ) -> Result<u32> {
3377        if !self.options(store, options).async_ {
3378            // The caller may only call `waitable-set.wait` from an async task
3379            // (i.e. a task created via a call to an async export).
3380            // Otherwise, we'll trap.
3381            store.check_blocking()?;
3382        }
3383
3384        let &CanonicalOptions {
3385            cancellable,
3386            instance: caller_instance,
3387            ..
3388        } = &self.id().get(store).component().env_component().options[options];
3389        let rep = store
3390            .instance_state(self.runtime_instance(caller_instance))
3391            .handle_table()
3392            .waitable_set_rep(set)?;
3393
3394        self.waitable_check(
3395            store,
3396            cancellable,
3397            WaitableCheck::Wait,
3398            WaitableCheckParams {
3399                set: TableId::new(rep),
3400                options,
3401                payload,
3402            },
3403        )
3404    }
3405
3406    /// Implements the `waitable-set.poll` intrinsic.
3407    pub(crate) fn waitable_set_poll(
3408        self,
3409        store: &mut StoreOpaque,
3410        options: OptionsIndex,
3411        set: u32,
3412        payload: u32,
3413    ) -> Result<u32> {
3414        let &CanonicalOptions {
3415            cancellable,
3416            instance: caller_instance,
3417            ..
3418        } = &self.id().get(store).component().env_component().options[options];
3419        let rep = store
3420            .instance_state(self.runtime_instance(caller_instance))
3421            .handle_table()
3422            .waitable_set_rep(set)?;
3423
3424        self.waitable_check(
3425            store,
3426            cancellable,
3427            WaitableCheck::Poll,
3428            WaitableCheckParams {
3429                set: TableId::new(rep),
3430                options,
3431                payload,
3432            },
3433        )
3434    }
3435
3436    /// Implements the `thread.index` intrinsic.
3437    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3438        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3439        match store
3440            .concurrent_state_mut()
3441            .get_mut(thread_id)?
3442            .instance_rep
3443        {
3444            Some(r) => Ok(r),
3445            None => bail_bug!("thread should have instance_rep by now"),
3446        }
3447    }
3448
3449    /// Implements the `thread.new-indirect` intrinsic.
3450    pub(crate) fn thread_new_indirect<T: 'static>(
3451        self,
3452        mut store: StoreContextMut<T>,
3453        runtime_instance: RuntimeComponentInstanceIndex,
3454        _func_ty_idx: TypeFuncIndex, // currently unused
3455        start_func_table_idx: RuntimeTableIndex,
3456        start_func_idx: u32,
3457        context: i32,
3458    ) -> Result<u32> {
3459        log::trace!("creating new thread");
3460
3461        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3462        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3463        let callee = instance
3464            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3465            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3466        if callee.type_index(store.0) != start_func_ty.type_index() {
3467            bail!(Trap::ThreadNewIndirectInvalidType);
3468        }
3469
3470        let token = StoreToken::new(store.as_context_mut());
3471        let start_func = Box::new(
3472            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3473                let old_thread = store.set_thread(guest_thread)?;
3474                log::trace!(
3475                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3476                );
3477
3478                let mut store = token.as_context_mut(store);
3479                let mut params = [ValRaw::i32(context)];
3480                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3481                // on a separate fiber if we're running in an async store.
3482                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3483
3484                store.0.set_thread(old_thread)?;
3485
3486                store.0.cleanup_thread(
3487                    guest_thread,
3488                    self.runtime_instance(runtime_instance),
3489                    CleanupTask::Yes,
3490                )?;
3491                log::trace!("explicit thread {guest_thread:?} completed");
3492                let state = store.0.concurrent_state_mut();
3493                if let Some(t) = old_thread.guest() {
3494                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3495                }
3496                log::trace!("thread start: restored {old_thread:?} as current thread");
3497
3498                Ok(())
3499            },
3500        );
3501
3502        let state = store.0.concurrent_state_mut();
3503        let current_thread = state.current_guest_thread()?;
3504        let parent_task = current_thread.task;
3505
3506        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3507        let thread_id = state.push(new_thread)?;
3508        state.get_mut(parent_task)?.threads.insert(thread_id);
3509
3510        log::trace!("new thread with id {thread_id:?} created");
3511
3512        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3513    }
3514
3515    pub(crate) fn resume_thread(
3516        self,
3517        store: &mut StoreOpaque,
3518        runtime_instance: RuntimeComponentInstanceIndex,
3519        thread_idx: u32,
3520        high_priority: bool,
3521        allow_ready: bool,
3522    ) -> Result<()> {
3523        let thread_id =
3524            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3525        let state = store.concurrent_state_mut();
3526        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3527        let thread = state.get_mut(guest_thread.thread)?;
3528
3529        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3530            GuestThreadState::NotStartedExplicit(start_func) => {
3531                log::trace!("starting thread {guest_thread:?}");
3532                let guest_call = WorkItem::GuestCall(
3533                    runtime_instance,
3534                    GuestCall {
3535                        thread: guest_thread,
3536                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3537                            start_func(store, guest_thread)
3538                        })),
3539                    },
3540                );
3541                store
3542                    .concurrent_state_mut()
3543                    .push_work_item(guest_call, high_priority);
3544            }
3545            GuestThreadState::Suspended(fiber) => {
3546                log::trace!("resuming thread {thread_id:?} that was suspended");
3547                store
3548                    .concurrent_state_mut()
3549                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3550            }
3551            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3552                log::trace!("resuming thread {thread_id:?} that was ready");
3553                thread.state = GuestThreadState::Ready { fiber, cancellable };
3554                store
3555                    .concurrent_state_mut()
3556                    .promote_thread_work_item(guest_thread);
3557            }
3558            other => {
3559                thread.state = other;
3560                bail!(Trap::CannotResumeThread);
3561            }
3562        }
3563        Ok(())
3564    }
3565
3566    fn add_guest_thread_to_instance_table(
3567        self,
3568        thread_id: TableId<GuestThread>,
3569        store: &mut StoreOpaque,
3570        runtime_instance: RuntimeComponentInstanceIndex,
3571    ) -> Result<u32> {
3572        let guest_id = store
3573            .instance_state(self.runtime_instance(runtime_instance))
3574            .thread_handle_table()
3575            .guest_thread_insert(thread_id.rep())?;
3576        store
3577            .concurrent_state_mut()
3578            .get_mut(thread_id)?
3579            .instance_rep = Some(guest_id);
3580        Ok(guest_id)
3581    }
3582
3583    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3584    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3585    pub(crate) fn suspension_intrinsic(
3586        self,
3587        store: &mut StoreOpaque,
3588        caller: RuntimeComponentInstanceIndex,
3589        cancellable: bool,
3590        yielding: bool,
3591        to_thread: SuspensionTarget,
3592    ) -> Result<WaitResult> {
3593        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3594        if to_thread.is_none() {
3595            let state = store.concurrent_state_mut();
3596            if yielding {
3597                // This is a `thread.yield` call
3598                if !state.may_block(guest_thread.task)? {
3599                    // In a non-blocking context, a `thread.yield` may trigger
3600                    // other threads in the same component instance to run.
3601                    if !state.promote_instance_local_thread_work_item(caller) {
3602                        // No other threads are runnable, so just return
3603                        return Ok(WaitResult::Completed);
3604                    }
3605                }
3606            } else {
3607                // The caller may only call `thread.suspend` from an async task
3608                // (i.e. a task created via a call to an async export).
3609                // Otherwise, we'll trap.
3610                store.check_blocking()?;
3611            }
3612        }
3613
3614        // There could be a pending cancellation from a previous uncancellable wait
3615        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3616            return Ok(WaitResult::Cancelled);
3617        }
3618
3619        match to_thread {
3620            SuspensionTarget::SomeSuspended(thread) => {
3621                self.resume_thread(store, caller, thread, true, false)?
3622            }
3623            SuspensionTarget::Some(thread) => {
3624                self.resume_thread(store, caller, thread, true, true)?
3625            }
3626            SuspensionTarget::None => { /* nothing to do */ }
3627        }
3628
3629        let reason = if yielding {
3630            SuspendReason::Yielding {
3631                thread: guest_thread,
3632                cancellable,
3633                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3634                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3635                // panic if we called it in a non-blocking context.
3636                skip_may_block_check: to_thread.is_some(),
3637            }
3638        } else {
3639            SuspendReason::ExplicitlySuspending {
3640                thread: guest_thread,
3641                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3642                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3643                // panic if we called it in a non-blocking context.
3644                skip_may_block_check: to_thread.is_some(),
3645            }
3646        };
3647
3648        store.suspend(reason)?;
3649
3650        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3651            Ok(WaitResult::Cancelled)
3652        } else {
3653            Ok(WaitResult::Completed)
3654        }
3655    }
3656
3657    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3658    fn waitable_check(
3659        self,
3660        store: &mut StoreOpaque,
3661        cancellable: bool,
3662        check: WaitableCheck,
3663        params: WaitableCheckParams,
3664    ) -> Result<u32> {
3665        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3666
3667        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3668
3669        let state = store.concurrent_state_mut();
3670        let task = state.get_mut(guest_thread.task)?;
3671
3672        // If we're waiting, and there are no events immediately available,
3673        // suspend the fiber until that changes.
3674        match &check {
3675            WaitableCheck::Wait => {
3676                let set = params.set;
3677
3678                if (task.event.is_none()
3679                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3680                    && state.get_mut(set)?.ready.is_empty()
3681                {
3682                    if cancellable {
3683                        let old = state
3684                            .get_mut(guest_thread.thread)?
3685                            .wake_on_cancel
3686                            .replace(set);
3687                        if !old.is_none() {
3688                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3689                        }
3690                    }
3691
3692                    store.suspend(SuspendReason::Waiting {
3693                        set,
3694                        thread: guest_thread,
3695                        skip_may_block_check: false,
3696                    })?;
3697                }
3698            }
3699            WaitableCheck::Poll => {}
3700        }
3701
3702        log::trace!(
3703            "waitable check for {guest_thread:?}; set {:?}, part two",
3704            params.set
3705        );
3706
3707        // Deliver any pending events to the guest and return.
3708        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3709
3710        let (ordinal, handle, result) = match &check {
3711            WaitableCheck::Wait => {
3712                let (event, waitable) = match event {
3713                    Some(p) => p,
3714                    None => bail_bug!("event expected to be present"),
3715                };
3716                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3717                let (ordinal, result) = event.parts();
3718                (ordinal, handle, result)
3719            }
3720            WaitableCheck::Poll => {
3721                if let Some((event, waitable)) = event {
3722                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3723                    let (ordinal, result) = event.parts();
3724                    (ordinal, handle, result)
3725                } else {
3726                    log::trace!(
3727                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3728                        guest_thread.task,
3729                        params.set
3730                    );
3731                    let (ordinal, result) = Event::None.parts();
3732                    (ordinal, 0, result)
3733                }
3734            }
3735        };
3736        let memory = self.options_memory_mut(store, params.options);
3737        let ptr = crate::component::func::validate_inbounds_dynamic(
3738            &CanonicalAbiInfo::POINTER_PAIR,
3739            memory,
3740            &ValRaw::u32(params.payload),
3741        )?;
3742        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3743        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3744        Ok(ordinal)
3745    }
3746
3747    /// Implements the `subtask.cancel` intrinsic.
3748    pub(crate) fn subtask_cancel(
3749        self,
3750        store: &mut StoreOpaque,
3751        caller_instance: RuntimeComponentInstanceIndex,
3752        async_: bool,
3753        task_id: u32,
3754    ) -> Result<u32> {
3755        if !async_ {
3756            // The caller may only sync call `subtask.cancel` from an async task
3757            // (i.e. a task created via a call to an async export).  Otherwise,
3758            // we'll trap.
3759            store.check_blocking()?;
3760        }
3761
3762        let (rep, is_host) = store
3763            .instance_state(self.runtime_instance(caller_instance))
3764            .handle_table()
3765            .subtask_rep(task_id)?;
3766        let waitable = if is_host {
3767            Waitable::Host(TableId::<HostTask>::new(rep))
3768        } else {
3769            Waitable::Guest(TableId::<GuestTask>::new(rep))
3770        };
3771        let concurrent_state = store.concurrent_state_mut();
3772
3773        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3774
3775        let needs_block;
3776        if let Waitable::Host(host_task) = waitable {
3777            let state = &mut concurrent_state.get_mut(host_task)?.state;
3778            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3779                // If the callee is still running, signal an abort is requested.
3780                //
3781                // After cancelling this falls through to block waiting for the
3782                // host task to actually finish assuming that `async_` is false.
3783                // This blocking behavior resolves the race of `handle.abort()`
3784                // with the task actually getting cancelled or finishing.
3785                HostTaskState::CalleeRunning(handle) => {
3786                    handle.abort();
3787                    needs_block = true;
3788                }
3789
3790                // Cancellation was already requested, so fail as the task can't
3791                // be cancelled twice.
3792                HostTaskState::CalleeDone { cancelled } => {
3793                    if cancelled {
3794                        bail!(Trap::SubtaskCancelAfterTerminal);
3795                    } else {
3796                        // The callee is already done so there's no need to
3797                        // block further for an event.
3798                        needs_block = false;
3799                    }
3800                }
3801
3802                // These states should not be possible for a subtask that's
3803                // visible from the guest, so trap here.
3804                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3805                    bail_bug!("invalid states for host callee")
3806                }
3807            }
3808        } else {
3809            let guest_task = TableId::<GuestTask>::new(rep);
3810            let task = concurrent_state.get_mut(guest_task)?;
3811            if !task.already_lowered_parameters() {
3812                store.cancel_guest_subtask_without_lowered_parameters(
3813                    self.runtime_instance(caller_instance),
3814                    guest_task,
3815                )?;
3816                return Ok(Status::StartCancelled as u32);
3817            } else if !task.returned_or_cancelled() {
3818                // Started, but not yet returned or cancelled; send the
3819                // `CANCELLED` event
3820                task.cancel_sent = true;
3821                // Note that this might overwrite an event that was set earlier
3822                // (e.g. `Event::None` if the task is yielding, or
3823                // `Event::Cancelled` if it was already cancelled), but that's
3824                // okay -- this should supersede the previous state.
3825                task.event = Some(Event::Cancelled);
3826                let runtime_instance = task.instance.index;
3827                for thread in task.threads.clone() {
3828                    let thread = QualifiedThreadId {
3829                        task: guest_task,
3830                        thread,
3831                    };
3832                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
3833                    if let Some(set) = thread_mut.wake_on_cancel.take() {
3834                        // The thread is in a cancellable wait, so wake it up:
3835                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3836                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3837                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3838                                runtime_instance,
3839                                GuestCall {
3840                                    thread,
3841                                    kind: GuestCallKind::DeliverEvent {
3842                                        instance,
3843                                        set: None,
3844                                    },
3845                                },
3846                            ),
3847                            None => bail_bug!("thread not present in wake_on_cancel set"),
3848                        };
3849                        concurrent_state.push_high_priority(item);
3850
3851                        let caller = concurrent_state.current_guest_thread()?;
3852                        store.suspend(SuspendReason::Yielding {
3853                            thread: caller,
3854                            cancellable: false,
3855                            // `subtask.cancel` is not allowed to be called in a
3856                            // sync context, so we cannot skip the may-block check.
3857                            skip_may_block_check: false,
3858                        })?;
3859                        break;
3860                    } else if let GuestThreadState::Ready {
3861                        cancellable: true, ..
3862                    } = &thread_mut.state
3863                    {
3864                        // The thread is in a cancellable yield, so yield back
3865                        // to it.
3866                        let caller = concurrent_state.current_guest_thread()?;
3867                        concurrent_state.promote_thread_work_item(thread);
3868                        store.suspend(SuspendReason::Yielding {
3869                            thread: caller,
3870                            cancellable: false,
3871                            skip_may_block_check: false,
3872                        })?;
3873                        break;
3874                    }
3875                }
3876
3877                // Guest tasks need to block if they have not yet returned or
3878                // cancelled, even as a result of the event delivery above.
3879                needs_block = !store
3880                    .concurrent_state_mut()
3881                    .get_mut(guest_task)?
3882                    .returned_or_cancelled()
3883            } else {
3884                needs_block = false;
3885            }
3886        };
3887
3888        // If we need to block waiting on the terminal status of this subtask
3889        // then return immediately in `async` mode, or otherwise wait for the
3890        // event to get signaled through the store.
3891        if needs_block {
3892            if async_ {
3893                return Ok(BLOCKED);
3894            }
3895
3896            // Wait for this waitable to get signaled with its terminal status
3897            // from the completion callback enqueued by `first_poll`. Once
3898            // that's done fall through to the sahred
3899            store.wait_for_event(waitable)?;
3900
3901            // .. fall through to determine what event's in store for us.
3902        }
3903
3904        let event = waitable.take_event(store.concurrent_state_mut())?;
3905        if let Some(Event::Subtask {
3906            status: status @ (Status::Returned | Status::ReturnCancelled),
3907        }) = event
3908        {
3909            Ok(status as u32)
3910        } else {
3911            bail!(Trap::SubtaskCancelAfterTerminal);
3912        }
3913    }
3914}
3915
3916/// Trait representing component model ABI async intrinsics and fused adapter
3917/// helper functions.
3918///
3919/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3920/// which must be valid for at least the duration of the call (and possibly for
3921/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3922/// pointers used for async calls).
3923pub trait VMComponentAsyncStore {
3924    /// A helper function for fused adapter modules involving calls where the
3925    /// one of the caller or callee is async.
3926    ///
3927    /// This helper is not used when the caller and callee both use the sync
3928    /// ABI, only when at least one is async is this used.
3929    unsafe fn prepare_call(
3930        &mut self,
3931        instance: Instance,
3932        memory: *mut VMMemoryDefinition,
3933        start: NonNull<VMFuncRef>,
3934        return_: NonNull<VMFuncRef>,
3935        caller_instance: RuntimeComponentInstanceIndex,
3936        callee_instance: RuntimeComponentInstanceIndex,
3937        task_return_type: TypeTupleIndex,
3938        callee_async: bool,
3939        string_encoding: StringEncoding,
3940        result_count: u32,
3941        storage: *mut ValRaw,
3942        storage_len: usize,
3943    ) -> Result<()>;
3944
3945    /// A helper function for fused adapter modules involving calls where the
3946    /// caller is sync-lowered but the callee is async-lifted.
3947    unsafe fn sync_start(
3948        &mut self,
3949        instance: Instance,
3950        callback: *mut VMFuncRef,
3951        callee: NonNull<VMFuncRef>,
3952        param_count: u32,
3953        storage: *mut MaybeUninit<ValRaw>,
3954        storage_len: usize,
3955    ) -> Result<()>;
3956
3957    /// A helper function for fused adapter modules involving calls where the
3958    /// caller is async-lowered.
3959    unsafe fn async_start(
3960        &mut self,
3961        instance: Instance,
3962        callback: *mut VMFuncRef,
3963        post_return: *mut VMFuncRef,
3964        callee: NonNull<VMFuncRef>,
3965        param_count: u32,
3966        result_count: u32,
3967        flags: u32,
3968    ) -> Result<u32>;
3969
3970    /// The `future.write` intrinsic.
3971    fn future_write(
3972        &mut self,
3973        instance: Instance,
3974        caller: RuntimeComponentInstanceIndex,
3975        ty: TypeFutureTableIndex,
3976        options: OptionsIndex,
3977        future: u32,
3978        address: u32,
3979    ) -> Result<u32>;
3980
3981    /// The `future.read` intrinsic.
3982    fn future_read(
3983        &mut self,
3984        instance: Instance,
3985        caller: RuntimeComponentInstanceIndex,
3986        ty: TypeFutureTableIndex,
3987        options: OptionsIndex,
3988        future: u32,
3989        address: u32,
3990    ) -> Result<u32>;
3991
3992    /// The `future.drop-writable` intrinsic.
3993    fn future_drop_writable(
3994        &mut self,
3995        instance: Instance,
3996        ty: TypeFutureTableIndex,
3997        writer: u32,
3998    ) -> Result<()>;
3999
4000    /// The `stream.write` intrinsic.
4001    fn stream_write(
4002        &mut self,
4003        instance: Instance,
4004        caller: RuntimeComponentInstanceIndex,
4005        ty: TypeStreamTableIndex,
4006        options: OptionsIndex,
4007        stream: u32,
4008        address: u32,
4009        count: u32,
4010    ) -> Result<u32>;
4011
4012    /// The `stream.read` intrinsic.
4013    fn stream_read(
4014        &mut self,
4015        instance: Instance,
4016        caller: RuntimeComponentInstanceIndex,
4017        ty: TypeStreamTableIndex,
4018        options: OptionsIndex,
4019        stream: u32,
4020        address: u32,
4021        count: u32,
4022    ) -> Result<u32>;
4023
4024    /// The "fast-path" implementation of the `stream.write` intrinsic for
4025    /// "flat" (i.e. memcpy-able) payloads.
4026    fn flat_stream_write(
4027        &mut self,
4028        instance: Instance,
4029        caller: RuntimeComponentInstanceIndex,
4030        ty: TypeStreamTableIndex,
4031        options: OptionsIndex,
4032        payload_size: u32,
4033        payload_align: u32,
4034        stream: u32,
4035        address: u32,
4036        count: u32,
4037    ) -> Result<u32>;
4038
4039    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4040    /// (i.e. memcpy-able) payloads.
4041    fn flat_stream_read(
4042        &mut self,
4043        instance: Instance,
4044        caller: RuntimeComponentInstanceIndex,
4045        ty: TypeStreamTableIndex,
4046        options: OptionsIndex,
4047        payload_size: u32,
4048        payload_align: u32,
4049        stream: u32,
4050        address: u32,
4051        count: u32,
4052    ) -> Result<u32>;
4053
4054    /// The `stream.drop-writable` intrinsic.
4055    fn stream_drop_writable(
4056        &mut self,
4057        instance: Instance,
4058        ty: TypeStreamTableIndex,
4059        writer: u32,
4060    ) -> Result<()>;
4061
4062    /// The `error-context.debug-message` intrinsic.
4063    fn error_context_debug_message(
4064        &mut self,
4065        instance: Instance,
4066        ty: TypeComponentLocalErrorContextTableIndex,
4067        options: OptionsIndex,
4068        err_ctx_handle: u32,
4069        debug_msg_address: u32,
4070    ) -> Result<()>;
4071
4072    /// The `thread.new-indirect` intrinsic
4073    fn thread_new_indirect(
4074        &mut self,
4075        instance: Instance,
4076        caller: RuntimeComponentInstanceIndex,
4077        func_ty_idx: TypeFuncIndex,
4078        start_func_table_idx: RuntimeTableIndex,
4079        start_func_idx: u32,
4080        context: i32,
4081    ) -> Result<u32>;
4082}
4083
4084/// SAFETY: See trait docs.
4085impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4086    unsafe fn prepare_call(
4087        &mut self,
4088        instance: Instance,
4089        memory: *mut VMMemoryDefinition,
4090        start: NonNull<VMFuncRef>,
4091        return_: NonNull<VMFuncRef>,
4092        caller_instance: RuntimeComponentInstanceIndex,
4093        callee_instance: RuntimeComponentInstanceIndex,
4094        task_return_type: TypeTupleIndex,
4095        callee_async: bool,
4096        string_encoding: StringEncoding,
4097        result_count_or_max_if_async: u32,
4098        storage: *mut ValRaw,
4099        storage_len: usize,
4100    ) -> Result<()> {
4101        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4102        // this method will have ensured that `storage` is a valid
4103        // pointer containing at least `storage_len` items.
4104        let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4105
4106        unsafe {
4107            instance.prepare_call(
4108                StoreContextMut(self),
4109                start,
4110                return_,
4111                caller_instance,
4112                callee_instance,
4113                task_return_type,
4114                callee_async,
4115                memory,
4116                string_encoding,
4117                match result_count_or_max_if_async {
4118                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4119                        params,
4120                        has_result: false,
4121                    },
4122                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4123                        params,
4124                        has_result: true,
4125                    },
4126                    result_count => CallerInfo::Sync {
4127                        params,
4128                        result_count,
4129                    },
4130                },
4131            )
4132        }
4133    }
4134
4135    unsafe fn sync_start(
4136        &mut self,
4137        instance: Instance,
4138        callback: *mut VMFuncRef,
4139        callee: NonNull<VMFuncRef>,
4140        param_count: u32,
4141        storage: *mut MaybeUninit<ValRaw>,
4142        storage_len: usize,
4143    ) -> Result<()> {
4144        unsafe {
4145            instance
4146                .start_call(
4147                    StoreContextMut(self),
4148                    callback,
4149                    ptr::null_mut(),
4150                    callee,
4151                    param_count,
4152                    1,
4153                    START_FLAG_ASYNC_CALLEE,
4154                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4155                    // this method will have ensured that `storage` is a valid
4156                    // pointer containing at least `storage_len` items.
4157                    Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4158                )
4159                .map(drop)
4160        }
4161    }
4162
4163    unsafe fn async_start(
4164        &mut self,
4165        instance: Instance,
4166        callback: *mut VMFuncRef,
4167        post_return: *mut VMFuncRef,
4168        callee: NonNull<VMFuncRef>,
4169        param_count: u32,
4170        result_count: u32,
4171        flags: u32,
4172    ) -> Result<u32> {
4173        unsafe {
4174            instance.start_call(
4175                StoreContextMut(self),
4176                callback,
4177                post_return,
4178                callee,
4179                param_count,
4180                result_count,
4181                flags,
4182                None,
4183            )
4184        }
4185    }
4186
4187    fn future_write(
4188        &mut self,
4189        instance: Instance,
4190        caller: RuntimeComponentInstanceIndex,
4191        ty: TypeFutureTableIndex,
4192        options: OptionsIndex,
4193        future: u32,
4194        address: u32,
4195    ) -> Result<u32> {
4196        instance
4197            .guest_write(
4198                StoreContextMut(self),
4199                caller,
4200                TransmitIndex::Future(ty),
4201                options,
4202                None,
4203                future,
4204                address,
4205                1,
4206            )
4207            .map(|result| result.encode())
4208    }
4209
4210    fn future_read(
4211        &mut self,
4212        instance: Instance,
4213        caller: RuntimeComponentInstanceIndex,
4214        ty: TypeFutureTableIndex,
4215        options: OptionsIndex,
4216        future: u32,
4217        address: u32,
4218    ) -> Result<u32> {
4219        instance
4220            .guest_read(
4221                StoreContextMut(self),
4222                caller,
4223                TransmitIndex::Future(ty),
4224                options,
4225                None,
4226                future,
4227                address,
4228                1,
4229            )
4230            .map(|result| result.encode())
4231    }
4232
4233    fn stream_write(
4234        &mut self,
4235        instance: Instance,
4236        caller: RuntimeComponentInstanceIndex,
4237        ty: TypeStreamTableIndex,
4238        options: OptionsIndex,
4239        stream: u32,
4240        address: u32,
4241        count: u32,
4242    ) -> Result<u32> {
4243        instance
4244            .guest_write(
4245                StoreContextMut(self),
4246                caller,
4247                TransmitIndex::Stream(ty),
4248                options,
4249                None,
4250                stream,
4251                address,
4252                count,
4253            )
4254            .map(|result| result.encode())
4255    }
4256
4257    fn stream_read(
4258        &mut self,
4259        instance: Instance,
4260        caller: RuntimeComponentInstanceIndex,
4261        ty: TypeStreamTableIndex,
4262        options: OptionsIndex,
4263        stream: u32,
4264        address: u32,
4265        count: u32,
4266    ) -> Result<u32> {
4267        instance
4268            .guest_read(
4269                StoreContextMut(self),
4270                caller,
4271                TransmitIndex::Stream(ty),
4272                options,
4273                None,
4274                stream,
4275                address,
4276                count,
4277            )
4278            .map(|result| result.encode())
4279    }
4280
4281    fn future_drop_writable(
4282        &mut self,
4283        instance: Instance,
4284        ty: TypeFutureTableIndex,
4285        writer: u32,
4286    ) -> Result<()> {
4287        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4288    }
4289
4290    fn flat_stream_write(
4291        &mut self,
4292        instance: Instance,
4293        caller: RuntimeComponentInstanceIndex,
4294        ty: TypeStreamTableIndex,
4295        options: OptionsIndex,
4296        payload_size: u32,
4297        payload_align: u32,
4298        stream: u32,
4299        address: u32,
4300        count: u32,
4301    ) -> Result<u32> {
4302        instance
4303            .guest_write(
4304                StoreContextMut(self),
4305                caller,
4306                TransmitIndex::Stream(ty),
4307                options,
4308                Some(FlatAbi {
4309                    size: payload_size,
4310                    align: payload_align,
4311                }),
4312                stream,
4313                address,
4314                count,
4315            )
4316            .map(|result| result.encode())
4317    }
4318
4319    fn flat_stream_read(
4320        &mut self,
4321        instance: Instance,
4322        caller: RuntimeComponentInstanceIndex,
4323        ty: TypeStreamTableIndex,
4324        options: OptionsIndex,
4325        payload_size: u32,
4326        payload_align: u32,
4327        stream: u32,
4328        address: u32,
4329        count: u32,
4330    ) -> Result<u32> {
4331        instance
4332            .guest_read(
4333                StoreContextMut(self),
4334                caller,
4335                TransmitIndex::Stream(ty),
4336                options,
4337                Some(FlatAbi {
4338                    size: payload_size,
4339                    align: payload_align,
4340                }),
4341                stream,
4342                address,
4343                count,
4344            )
4345            .map(|result| result.encode())
4346    }
4347
4348    fn stream_drop_writable(
4349        &mut self,
4350        instance: Instance,
4351        ty: TypeStreamTableIndex,
4352        writer: u32,
4353    ) -> Result<()> {
4354        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4355    }
4356
4357    fn error_context_debug_message(
4358        &mut self,
4359        instance: Instance,
4360        ty: TypeComponentLocalErrorContextTableIndex,
4361        options: OptionsIndex,
4362        err_ctx_handle: u32,
4363        debug_msg_address: u32,
4364    ) -> Result<()> {
4365        instance.error_context_debug_message(
4366            StoreContextMut(self),
4367            ty,
4368            options,
4369            err_ctx_handle,
4370            debug_msg_address,
4371        )
4372    }
4373
4374    fn thread_new_indirect(
4375        &mut self,
4376        instance: Instance,
4377        caller: RuntimeComponentInstanceIndex,
4378        func_ty_idx: TypeFuncIndex,
4379        start_func_table_idx: RuntimeTableIndex,
4380        start_func_idx: u32,
4381        context: i32,
4382    ) -> Result<u32> {
4383        instance.thread_new_indirect(
4384            StoreContextMut(self),
4385            caller,
4386            func_ty_idx,
4387            start_func_table_idx,
4388            start_func_idx,
4389            context,
4390        )
4391    }
4392}
4393
4394type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4395
4396/// Represents the state of a pending host task.
4397///
4398/// This is used to represent tasks when the guest calls into the host.
4399pub(crate) struct HostTask {
4400    common: WaitableCommon,
4401
4402    /// Guest thread which called the host.
4403    caller: QualifiedThreadId,
4404
4405    /// State of borrows/etc the host needs to track. Used when the guest passes
4406    /// borrows to the host, for example.
4407    call_context: CallContext,
4408
4409    state: HostTaskState,
4410}
4411
4412enum HostTaskState {
4413    /// A host task has been created and it's considered "started".
4414    ///
4415    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4416    /// is where this will get updated further.
4417    CalleeStarted,
4418
4419    /// State used for tasks in `first_poll` meaning that the guest did an async
4420    /// lower of a host async function which is blocked. The specified handle is
4421    /// linked to the future in the main `FuturesUnordered` of a store which is
4422    /// used to cancel it if the guest requests cancellation.
4423    CalleeRunning(JoinHandle),
4424
4425    /// Terminal state used for tasks in `poll_and_block` to store the result of
4426    /// their computation. Note that this state is not used for tasks in
4427    /// `first_poll`.
4428    CalleeFinished(LiftedResult),
4429
4430    /// Terminal state for host tasks meaning that the task was cancelled or the
4431    /// result was taken.
4432    CalleeDone { cancelled: bool },
4433}
4434
4435impl HostTask {
4436    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4437        Self {
4438            common: WaitableCommon::default(),
4439            call_context: CallContext::default(),
4440            caller,
4441            state,
4442        }
4443    }
4444}
4445
4446impl TableDebug for HostTask {
4447    fn type_name() -> &'static str {
4448        "HostTask"
4449    }
4450}
4451
4452type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4453
4454/// Represents the caller of a given guest task.
4455enum Caller {
4456    /// The host called the guest task.
4457    Host {
4458        /// If present, may be used to deliver the result.
4459        tx: Option<oneshot::Sender<LiftedResult>>,
4460        /// If true, there's a host future that must be dropped before the task
4461        /// can be deleted.
4462        host_future_present: bool,
4463        /// Represents the caller of the host function which called back into a
4464        /// guest. Note that this thread could belong to an entirely unrelated
4465        /// top-level component instance than the one the host called into.
4466        caller: CurrentThread,
4467    },
4468    /// Another guest thread called the guest task
4469    Guest {
4470        /// The id of the caller
4471        thread: QualifiedThreadId,
4472    },
4473}
4474
4475/// Represents a closure and related canonical ABI parameters required to
4476/// validate a `task.return` call at runtime and lift the result.
4477struct LiftResult {
4478    lift: RawLift,
4479    ty: TypeTupleIndex,
4480    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4481    string_encoding: StringEncoding,
4482}
4483
4484/// The table ID for a guest thread, qualified by the task to which it belongs.
4485///
4486/// This exists to minimize table lookups and the necessity to pass stores around mutably
4487/// for the common case of identifying the task to which a thread belongs.
4488#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4489pub(crate) struct QualifiedThreadId {
4490    task: TableId<GuestTask>,
4491    thread: TableId<GuestThread>,
4492}
4493
4494impl QualifiedThreadId {
4495    fn qualify(
4496        state: &mut ConcurrentState,
4497        thread: TableId<GuestThread>,
4498    ) -> Result<QualifiedThreadId> {
4499        Ok(QualifiedThreadId {
4500            task: state.get_mut(thread)?.parent_task,
4501            thread,
4502        })
4503    }
4504}
4505
4506impl fmt::Debug for QualifiedThreadId {
4507    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4508        f.debug_tuple("QualifiedThreadId")
4509            .field(&self.task.rep())
4510            .field(&self.thread.rep())
4511            .finish()
4512    }
4513}
4514
4515enum GuestThreadState {
4516    NotStartedImplicit,
4517    NotStartedExplicit(
4518        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4519    ),
4520    Running,
4521    Suspended(StoreFiber<'static>),
4522    Ready {
4523        fiber: StoreFiber<'static>,
4524        cancellable: bool,
4525    },
4526    Completed,
4527}
4528pub struct GuestThread {
4529    /// Context-local state used to implement the `context.{get,set}`
4530    /// intrinsics.
4531    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4532    /// The owning guest task.
4533    parent_task: TableId<GuestTask>,
4534    /// If present, indicates that the thread is currently waiting on the
4535    /// specified set but may be cancelled and woken immediately.
4536    wake_on_cancel: Option<TableId<WaitableSet>>,
4537    /// The execution state of this guest thread
4538    state: GuestThreadState,
4539    /// The index of this thread in the component instance's handle table.
4540    /// This must always be `Some` after initialization.
4541    instance_rep: Option<u32>,
4542    /// Scratch waitable set used to watch subtasks during synchronous calls.
4543    sync_call_set: TableId<WaitableSet>,
4544}
4545
4546impl GuestThread {
4547    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4548    /// handle.
4549    fn from_instance(
4550        state: Pin<&mut ComponentInstance>,
4551        caller_instance: RuntimeComponentInstanceIndex,
4552        guest_thread: u32,
4553    ) -> Result<TableId<Self>> {
4554        let rep = state.instance_states().0[caller_instance]
4555            .thread_handle_table()
4556            .guest_thread_rep(guest_thread)?;
4557        Ok(TableId::new(rep))
4558    }
4559
4560    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4561        let sync_call_set = state.push(WaitableSet {
4562            is_sync_call_set: true,
4563            ..WaitableSet::default()
4564        })?;
4565        Ok(Self {
4566            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4567            parent_task,
4568            wake_on_cancel: None,
4569            state: GuestThreadState::NotStartedImplicit,
4570            instance_rep: None,
4571            sync_call_set,
4572        })
4573    }
4574
4575    fn new_explicit(
4576        state: &mut ConcurrentState,
4577        parent_task: TableId<GuestTask>,
4578        start_func: Box<
4579            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4580        >,
4581    ) -> Result<Self> {
4582        let sync_call_set = state.push(WaitableSet {
4583            is_sync_call_set: true,
4584            ..WaitableSet::default()
4585        })?;
4586        Ok(Self {
4587            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4588            parent_task,
4589            wake_on_cancel: None,
4590            state: GuestThreadState::NotStartedExplicit(start_func),
4591            instance_rep: None,
4592            sync_call_set,
4593        })
4594    }
4595}
4596
4597impl TableDebug for GuestThread {
4598    fn type_name() -> &'static str {
4599        "GuestThread"
4600    }
4601}
4602
4603enum SyncResult {
4604    NotProduced,
4605    Produced(Option<ValRaw>),
4606    Taken,
4607}
4608
4609impl SyncResult {
4610    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4611        Ok(match mem::replace(self, SyncResult::Taken) {
4612            SyncResult::NotProduced => None,
4613            SyncResult::Produced(val) => Some(val),
4614            SyncResult::Taken => {
4615                bail_bug!("attempted to take a synchronous result that was already taken")
4616            }
4617        })
4618    }
4619}
4620
4621#[derive(Debug)]
4622enum HostFutureState {
4623    NotApplicable,
4624    Live,
4625    Dropped,
4626}
4627
4628/// Represents a pending guest task.
4629pub(crate) struct GuestTask {
4630    /// See `WaitableCommon`
4631    common: WaitableCommon,
4632    /// Closure to lower the parameters passed to this task.
4633    lower_params: Option<RawLower>,
4634    /// See `LiftResult`
4635    lift_result: Option<LiftResult>,
4636    /// A place to stash the type-erased lifted result if it can't be delivered
4637    /// immediately.
4638    result: Option<LiftedResult>,
4639    /// Closure to call the callback function for an async-lifted export, if
4640    /// provided.
4641    callback: Option<CallbackFn>,
4642    /// See `Caller`
4643    caller: Caller,
4644    /// Borrow state for this task.
4645    ///
4646    /// Keeps track of `borrow<T>` received to this task to ensure that
4647    /// everything is dropped by the time it exits.
4648    call_context: CallContext,
4649    /// A place to stash the lowered result for a sync-to-async call until it
4650    /// can be returned to the caller.
4651    sync_result: SyncResult,
4652    /// Whether or not the task has been cancelled (i.e. whether the task is
4653    /// permitted to call `task.cancel`).
4654    cancel_sent: bool,
4655    /// Whether or not we've sent a `Status::Starting` event to any current or
4656    /// future waiters for this waitable.
4657    starting_sent: bool,
4658    /// The runtime instance to which the exported function for this guest task
4659    /// belongs.
4660    ///
4661    /// Note that the task may do a sync->sync call via a fused adapter which
4662    /// results in that task executing code in a different instance, and it may
4663    /// call host functions and intrinsics from that other instance.
4664    instance: RuntimeInstance,
4665    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4666    /// delivered to this task.
4667    event: Option<Event>,
4668    /// Whether or not the task has exited.
4669    exited: bool,
4670    /// Threads belonging to this task
4671    threads: HashSet<TableId<GuestThread>>,
4672    /// The state of the host future that represents an async task, which must
4673    /// be dropped before we can delete the task.
4674    host_future_state: HostFutureState,
4675    /// Indicates whether this task was created for a call to an async-lifted
4676    /// export.
4677    async_function: bool,
4678
4679    decremented_interesting_task_count: bool,
4680}
4681
4682impl GuestTask {
4683    fn already_lowered_parameters(&self) -> bool {
4684        // We reset `lower_params` after we lower the parameters
4685        self.lower_params.is_none()
4686    }
4687
4688    fn returned_or_cancelled(&self) -> bool {
4689        // We reset `lift_result` after we return or exit
4690        self.lift_result.is_none()
4691    }
4692
4693    fn ready_to_delete(&self) -> bool {
4694        let threads_completed = self.threads.is_empty();
4695        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4696        let pending_completion_event = matches!(
4697            self.common.event,
4698            Some(Event::Subtask {
4699                status: Status::Returned | Status::ReturnCancelled
4700            })
4701        );
4702        let ready = threads_completed
4703            && !has_sync_result
4704            && !pending_completion_event
4705            && !matches!(self.host_future_state, HostFutureState::Live);
4706        log::trace!(
4707            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4708            threads_completed,
4709            has_sync_result,
4710            pending_completion_event,
4711            self.host_future_state
4712        );
4713        ready
4714    }
4715
4716    fn new(
4717        state: &mut ConcurrentState,
4718        lower_params: RawLower,
4719        lift_result: LiftResult,
4720        caller: Caller,
4721        callback: Option<CallbackFn>,
4722        instance: RuntimeInstance,
4723        async_function: bool,
4724    ) -> Result<QualifiedThreadId> {
4725        let host_future_state = match &caller {
4726            Caller::Guest { .. } => HostFutureState::NotApplicable,
4727            Caller::Host {
4728                host_future_present,
4729                ..
4730            } => {
4731                if *host_future_present {
4732                    HostFutureState::Live
4733                } else {
4734                    HostFutureState::NotApplicable
4735                }
4736            }
4737        };
4738        let task = state.push(Self {
4739            common: WaitableCommon::default(),
4740            lower_params: Some(lower_params),
4741            lift_result: Some(lift_result),
4742            result: None,
4743            callback,
4744            caller,
4745            call_context: CallContext::default(),
4746            sync_result: SyncResult::NotProduced,
4747            cancel_sent: false,
4748            starting_sent: false,
4749            instance,
4750            event: None,
4751            exited: false,
4752            threads: HashSet::new(),
4753            host_future_state,
4754            async_function,
4755            decremented_interesting_task_count: false,
4756        })?;
4757        let new_thread = GuestThread::new_implicit(state, task)?;
4758        let thread = state.push(new_thread)?;
4759        state.get_mut(task)?.threads.insert(thread);
4760        state.interesting_tasks += 1;
4761        Ok(QualifiedThreadId { task, thread })
4762    }
4763}
4764
4765impl TableDebug for GuestTask {
4766    fn type_name() -> &'static str {
4767        "GuestTask"
4768    }
4769}
4770
4771/// Represents state common to all kinds of waitables.
4772#[derive(Default)]
4773struct WaitableCommon {
4774    /// The currently pending event for this waitable, if any.
4775    event: Option<Event>,
4776    /// The set to which this waitable belongs, if any.
4777    set: Option<TableId<WaitableSet>>,
4778    /// The handle with which the guest refers to this waitable, if any.
4779    handle: Option<u32>,
4780}
4781
4782/// Represents a Component Model Async `waitable`.
4783#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4784enum Waitable {
4785    /// A host task
4786    Host(TableId<HostTask>),
4787    /// A guest task
4788    Guest(TableId<GuestTask>),
4789    /// The read or write end of a stream or future
4790    Transmit(TableId<TransmitHandle>),
4791}
4792
4793impl Waitable {
4794    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4795    /// handle.
4796    fn from_instance(
4797        state: Pin<&mut ComponentInstance>,
4798        caller_instance: RuntimeComponentInstanceIndex,
4799        waitable: u32,
4800    ) -> Result<Self> {
4801        use crate::runtime::vm::component::Waitable;
4802
4803        let (waitable, kind) = state.instance_states().0[caller_instance]
4804            .handle_table()
4805            .waitable_rep(waitable)?;
4806
4807        Ok(match kind {
4808            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4809            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4810            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4811        })
4812    }
4813
4814    /// Retrieve the host-visible identifier for this `Waitable`.
4815    fn rep(&self) -> u32 {
4816        match self {
4817            Self::Host(id) => id.rep(),
4818            Self::Guest(id) => id.rep(),
4819            Self::Transmit(id) => id.rep(),
4820        }
4821    }
4822
4823    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4824    /// remove it from any set it may currently belong to (when `set` is
4825    /// `None`).
4826    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4827        log::trace!("waitable {self:?} join set {set:?}");
4828
4829        let old = mem::replace(&mut self.common(state)?.set, set);
4830
4831        if let Some(old) = old {
4832            match *self {
4833                Waitable::Host(id) => state.remove_child(id, old),
4834                Waitable::Guest(id) => state.remove_child(id, old),
4835                Waitable::Transmit(id) => state.remove_child(id, old),
4836            }?;
4837
4838            state.get_mut(old)?.ready.remove(self);
4839        }
4840
4841        if let Some(set) = set {
4842            match *self {
4843                Waitable::Host(id) => state.add_child(id, set),
4844                Waitable::Guest(id) => state.add_child(id, set),
4845                Waitable::Transmit(id) => state.add_child(id, set),
4846            }?;
4847
4848            if self.common(state)?.event.is_some() {
4849                self.mark_ready(state)?;
4850            }
4851        }
4852
4853        Ok(())
4854    }
4855
4856    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4857    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4858        Ok(match self {
4859            Self::Host(id) => &mut state.get_mut(*id)?.common,
4860            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4861            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4862        })
4863    }
4864
4865    /// Set or clear the pending event for this waitable and either deliver it
4866    /// to the first waiter, if any, or mark it as ready to be delivered to the
4867    /// next waiter that arrives.
4868    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4869        log::trace!("set event for {self:?}: {event:?}");
4870        self.common(state)?.event = event;
4871        self.mark_ready(state)
4872    }
4873
4874    /// Take the pending event from this waitable, leaving `None` in its place.
4875    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4876        let common = self.common(state)?;
4877        let event = common.event.take();
4878        if let Some(set) = self.common(state)?.set {
4879            state.get_mut(set)?.ready.remove(self);
4880        }
4881
4882        Ok(event)
4883    }
4884
4885    /// Deliver the current event for this waitable to the first waiter, if any,
4886    /// or else mark it as ready to be delivered to the next waiter that
4887    /// arrives.
4888    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4889        if let Some(set) = self.common(state)?.set {
4890            state.get_mut(set)?.ready.insert(*self);
4891            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4892                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4893                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4894
4895                let item = match mode {
4896                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4897                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4898                        state.get_mut(thread.task)?.instance.index,
4899                        GuestCall {
4900                            thread,
4901                            kind: GuestCallKind::DeliverEvent {
4902                                instance,
4903                                set: Some(set),
4904                            },
4905                        },
4906                    ),
4907                };
4908                state.push_high_priority(item);
4909            }
4910        }
4911        Ok(())
4912    }
4913
4914    /// Remove this waitable from the instance's rep table.
4915    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4916        match self {
4917            Self::Host(task) => {
4918                log::trace!("delete host task {task:?}");
4919                state.delete(*task)?;
4920            }
4921            Self::Guest(task) => {
4922                log::trace!("delete guest task {task:?}");
4923                let task = state.delete(*task)?;
4924
4925                // When a guest task is created it increments the
4926                // `ConcurrentState::interesting_tasks` counter, and that needs
4927                // to be paired with a decrement. There are a few situations in
4928                // which the decrement needs to happen which don't all funnel
4929                // through here, so in lieu of that at least try to catch issues
4930                // where we forgot to do a decrement.
4931                debug_assert!(task.decremented_interesting_task_count);
4932            }
4933            Self::Transmit(task) => {
4934                state.delete(*task)?;
4935            }
4936        }
4937
4938        Ok(())
4939    }
4940}
4941
4942impl fmt::Debug for Waitable {
4943    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4944        match self {
4945            Self::Host(id) => write!(f, "{id:?}"),
4946            Self::Guest(id) => write!(f, "{id:?}"),
4947            Self::Transmit(id) => write!(f, "{id:?}"),
4948        }
4949    }
4950}
4951
4952/// Represents a Component Model Async `waitable-set`.
4953#[derive(Default)]
4954struct WaitableSet {
4955    /// Which waitables in this set have pending events, if any.
4956    ready: BTreeSet<Waitable>,
4957    /// Which guest threads are currently waiting on this set, if any.
4958    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4959    /// Whether this set is a synthetic, internal one meant for handling
4960    /// synchronous calls.
4961    is_sync_call_set: bool,
4962}
4963
4964impl TableDebug for WaitableSet {
4965    fn type_name() -> &'static str {
4966        "WaitableSet"
4967    }
4968}
4969
4970/// Type-erased closure to lower the parameters for a guest task.
4971type RawLower =
4972    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4973
4974/// Type-erased closure to lift the result for a guest task.
4975type RawLift = Box<
4976    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4977>;
4978
4979/// Type erased result of a guest task which may be downcast to the expected
4980/// type by a host caller (or simply ignored in the case of a guest caller; see
4981/// `DummyResult`).
4982type LiftedResult = Box<dyn Any + Send + Sync>;
4983
4984/// Used to return a result from a `LiftFn` when the actual result has already
4985/// been lowered to a guest task's stack and linear memory.
4986struct DummyResult;
4987
4988/// Represents the Component Model Async state of a (sub-)component instance.
4989#[derive(Default)]
4990pub struct ConcurrentInstanceState {
4991    /// Whether backpressure is set for this instance (enabled if >0)
4992    backpressure: u16,
4993    /// Whether this instance can be entered
4994    do_not_enter: bool,
4995    /// Pending calls for this instance which require `Self::backpressure` to be
4996    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4997    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4998}
4999
5000impl ConcurrentInstanceState {
5001    pub fn pending_is_empty(&self) -> bool {
5002        self.pending.is_empty()
5003    }
5004}
5005
5006#[derive(Debug, Copy, Clone)]
5007pub(crate) enum CurrentThread {
5008    Guest(QualifiedThreadId),
5009    Host(TableId<HostTask>),
5010    None,
5011}
5012
5013impl CurrentThread {
5014    fn guest(&self) -> Option<&QualifiedThreadId> {
5015        match self {
5016            Self::Guest(id) => Some(id),
5017            _ => None,
5018        }
5019    }
5020
5021    fn host(&self) -> Option<TableId<HostTask>> {
5022        match self {
5023            Self::Host(id) => Some(*id),
5024            _ => None,
5025        }
5026    }
5027
5028    fn is_none(&self) -> bool {
5029        matches!(self, Self::None)
5030    }
5031}
5032
5033impl From<QualifiedThreadId> for CurrentThread {
5034    fn from(id: QualifiedThreadId) -> Self {
5035        Self::Guest(id)
5036    }
5037}
5038
5039impl From<TableId<HostTask>> for CurrentThread {
5040    fn from(id: TableId<HostTask>) -> Self {
5041        Self::Host(id)
5042    }
5043}
5044
5045/// Represents the Component Model Async state of a store.
5046pub struct ConcurrentState {
5047    /// The currently running thread, if any.
5048    current_thread: CurrentThread,
5049
5050    /// The set of pending host and background tasks, if any.
5051    ///
5052    /// See `ComponentInstance::poll_until` for where we temporarily take this
5053    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5054    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5055    /// The table of waitables, waitable sets, etc.
5056    table: AlwaysMut<ResourceTable>,
5057    /// The "high priority" work queue for this store's event loop.
5058    high_priority: Vec<WorkItem>,
5059    /// The "low priority" work queue for this store's event loop.
5060    low_priority: VecDeque<WorkItem>,
5061    /// A place to stash the reason a fiber is suspending so that the code which
5062    /// resumed it will know under what conditions the fiber should be resumed
5063    /// again.
5064    suspend_reason: Option<SuspendReason>,
5065    /// A cached fiber which is waiting for work to do.
5066    ///
5067    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5068    worker: Option<StoreFiber<'static>>,
5069    /// A place to stash the work item for which we're resuming a worker fiber.
5070    worker_item: Option<WorkerItem>,
5071
5072    /// Reference counts for all component error contexts
5073    ///
5074    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5075    /// (sub)component ref counts as tracked by `error_context_tables`, for
5076    /// example when the host holds one or more references to error contexts.
5077    ///
5078    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5079    /// component-wide representation) of the index into concurrent state for a given
5080    /// stored `ErrorContext`.
5081    ///
5082    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5083    /// as a `TableId<ErrorContextState>`.
5084    global_error_context_ref_counts:
5085        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5086
5087    /// The number of "interesting tasks" currently executing in the store.
5088    ///
5089    /// This tracks the concept of a component instance lifetime as defined in
5090    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5091    /// all tasks currently increment this counter which then gets decremented
5092    /// when they exit. In the future some tasks might not increment this
5093    /// counter, but for now all do.
5094    ///
5095    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5096    /// inform the embedder when all tasks have completed. This is then
5097    /// used in wasmtime-wasi-http, for example, to know when an instance is
5098    /// idle.
5099    interesting_tasks: usize,
5100
5101    /// Single waker to notify when `interesting_tasks` reaches 0.
5102    ///
5103    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5104    interesting_tasks_empty_waker: Option<Waker>,
5105}
5106
5107impl Default for ConcurrentState {
5108    fn default() -> Self {
5109        Self {
5110            current_thread: CurrentThread::None,
5111            table: AlwaysMut::new(ResourceTable::new()),
5112            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5113            high_priority: Vec::new(),
5114            low_priority: VecDeque::new(),
5115            suspend_reason: None,
5116            worker: None,
5117            worker_item: None,
5118            global_error_context_ref_counts: BTreeMap::new(),
5119            interesting_tasks: 0,
5120            interesting_tasks_empty_waker: None,
5121        }
5122    }
5123}
5124
5125impl ConcurrentState {
5126    /// Take ownership of any fibers and futures owned by this object.
5127    ///
5128    /// This should be used when disposing of the `Store` containing this object
5129    /// in order to gracefully resolve any and all fibers using
5130    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5131    /// use-after-free bugs due to fibers which may still have access to the
5132    /// `Store`.
5133    ///
5134    /// Additionally, the futures collected with this function should be dropped
5135    /// within a `tls::set` call, which will ensure than any futures closing
5136    /// over an `&Accessor` will have access to the store when dropped, allowing
5137    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5138    /// panicking.
5139    ///
5140    /// Note that this will leave the object in an inconsistent and unusable
5141    /// state, so it should only be used just prior to dropping it.
5142    pub(crate) fn take_fibers_and_futures(
5143        &mut self,
5144        fibers: &mut Vec<StoreFiber<'static>>,
5145        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5146    ) {
5147        for entry in self.table.get_mut().iter_mut() {
5148            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5149                for mode in mem::take(&mut set.waiting).into_values() {
5150                    if let WaitMode::Fiber(fiber) = mode {
5151                        fibers.push(fiber);
5152                    }
5153                }
5154            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5155                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5156                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5157                {
5158                    fibers.push(fiber);
5159                }
5160            }
5161        }
5162
5163        if let Some(fiber) = self.worker.take() {
5164            fibers.push(fiber);
5165        }
5166
5167        let mut handle_item = |item| match item {
5168            WorkItem::ResumeFiber(fiber) => {
5169                fibers.push(fiber);
5170            }
5171            WorkItem::PushFuture(future) => {
5172                self.futures
5173                    .get_mut()
5174                    .as_mut()
5175                    .unwrap()
5176                    .push(future.into_inner());
5177            }
5178            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5179            }
5180        };
5181
5182        for item in mem::take(&mut self.high_priority) {
5183            handle_item(item);
5184        }
5185        for item in mem::take(&mut self.low_priority) {
5186            handle_item(item);
5187        }
5188
5189        if let Some(them) = self.futures.get_mut().take() {
5190            futures.push(them);
5191        }
5192    }
5193
5194    #[cfg(feature = "gc")]
5195    pub(crate) fn trace_fiber_roots(
5196        &mut self,
5197        modules: &ModuleRegistry,
5198        unwind: &dyn Unwind,
5199        gc_roots_list: &mut GcRootsList,
5200    ) {
5201        let ConcurrentState {
5202            table,
5203            worker,
5204            high_priority,
5205            low_priority,
5206
5207            // TODO(cm-gc): This field contains `ValRaw`s, but they are never GC
5208            // references because the component model doesn't support GC yet. We
5209            // will need to trace these somehow when it does.
5210            futures: _,
5211
5212            // These fields do not contain GC references.
5213            worker_item: _,
5214            current_thread: _,
5215            suspend_reason: _,
5216            global_error_context_ref_counts: _,
5217            interesting_tasks: _,
5218            interesting_tasks_empty_waker: _,
5219        } = self;
5220
5221        for entry in table.get_mut().iter_mut() {
5222            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5223                for mode in set.waiting.values_mut() {
5224                    if let WaitMode::Fiber(fiber) = mode {
5225                        fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5226                    }
5227                }
5228            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5229                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5230                    &mut thread.state
5231                {
5232                    fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5233                }
5234            }
5235        }
5236
5237        if let Some(fiber) = worker {
5238            fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5239        }
5240
5241        let mut handle_item = |item: &mut WorkItem| match item {
5242            WorkItem::ResumeFiber(fiber) => {
5243                fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5244            }
5245            WorkItem::PushFuture(_future) => {
5246                // TODO(cm-gc): once futures can contain GC roots, we will need
5247                // to trace them.
5248            }
5249            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5250            }
5251        };
5252
5253        for item in high_priority {
5254            handle_item(item);
5255        }
5256        for item in low_priority {
5257            handle_item(item);
5258        }
5259    }
5260
5261    fn push<V: Send + Sync + 'static>(
5262        &mut self,
5263        value: V,
5264    ) -> Result<TableId<V>, ResourceTableError> {
5265        self.table.get_mut().push(value).map(TableId::from)
5266    }
5267
5268    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5269        self.table.get_mut().get_mut(&Resource::from(id))
5270    }
5271
5272    pub fn add_child<T: 'static, U: 'static>(
5273        &mut self,
5274        child: TableId<T>,
5275        parent: TableId<U>,
5276    ) -> Result<(), ResourceTableError> {
5277        self.table
5278            .get_mut()
5279            .add_child(Resource::from(child), Resource::from(parent))
5280    }
5281
5282    pub fn remove_child<T: 'static, U: 'static>(
5283        &mut self,
5284        child: TableId<T>,
5285        parent: TableId<U>,
5286    ) -> Result<(), ResourceTableError> {
5287        self.table
5288            .get_mut()
5289            .remove_child(Resource::from(child), Resource::from(parent))
5290    }
5291
5292    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5293        self.table.get_mut().delete(Resource::from(id))
5294    }
5295
5296    fn push_future(&mut self, future: HostTaskFuture) {
5297        // Note that we can't directly push to `ConcurrentState::futures` here
5298        // since this may be called from a future that's being polled inside
5299        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5300        // so it has exclusive access while polling it.  Therefore, we push a
5301        // work item to the "high priority" queue, which will actually push to
5302        // `ConcurrentState::futures` later.
5303        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5304    }
5305
5306    fn push_high_priority(&mut self, item: WorkItem) {
5307        log::trace!("push high priority: {item:?}");
5308        self.high_priority.push(item);
5309    }
5310
5311    fn push_low_priority(&mut self, item: WorkItem) {
5312        log::trace!("push low priority: {item:?}");
5313        self.low_priority.push_front(item);
5314    }
5315
5316    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5317        if high_priority {
5318            self.push_high_priority(item);
5319        } else {
5320            self.push_low_priority(item);
5321        }
5322    }
5323
5324    fn promote_instance_local_thread_work_item(
5325        &mut self,
5326        current_instance: RuntimeComponentInstanceIndex,
5327    ) -> bool {
5328        self.promote_work_items_matching(|item: &WorkItem| match item {
5329            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5330                *instance == current_instance
5331            }
5332            _ => false,
5333        })
5334    }
5335
5336    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5337        self.promote_work_items_matching(|item: &WorkItem| match item {
5338            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5339                *t == thread
5340            }
5341            _ => false,
5342        })
5343    }
5344
5345    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5346    where
5347        F: FnMut(&WorkItem) -> bool,
5348    {
5349        // If there's a high-priority work item to resume the current guest thread,
5350        // we don't need to promote anything, but we return true to indicate that
5351        // work is pending for the current instance.
5352        if self.high_priority.iter().any(&mut predicate) {
5353            true
5354        }
5355        // Otherwise, look for a low-priority work item that matches the current
5356        // instance and promote it to high-priority.
5357        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5358            let item = self.low_priority.remove(idx).unwrap();
5359            self.push_high_priority(item);
5360            true
5361        } else {
5362            false
5363        }
5364    }
5365
5366    /// Returns whether there's a pending cancellation on the current guest thread,
5367    /// consuming the event if so.
5368    fn take_pending_cancellation(&mut self) -> Result<bool> {
5369        let thread = self.current_guest_thread()?;
5370        let task = self.get_mut(thread.task)?;
5371        if let Some(Event::Cancelled) = task.event {
5372            task.event.take();
5373            return Ok(true);
5374        }
5375        Ok(false)
5376    }
5377
5378    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5379        if self.may_block(task)? {
5380            Ok(())
5381        } else {
5382            Err(Trap::CannotBlockSyncTask.into())
5383        }
5384    }
5385
5386    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5387        let task = self.get_mut(task)?;
5388        Ok(task.async_function || task.returned_or_cancelled())
5389    }
5390
5391    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5392    /// specified task.
5393    ///
5394    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5395    /// below.
5396    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5397        let (task, is_host) = (task >> 1, task & 1 == 1);
5398        if is_host {
5399            let task: TableId<HostTask> = TableId::new(task);
5400            Ok(&mut self.get_mut(task)?.call_context)
5401        } else {
5402            let task: TableId<GuestTask> = TableId::new(task);
5403            Ok(&mut self.get_mut(task)?.call_context)
5404        }
5405    }
5406
5407    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5408    /// in the future.
5409    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5410        let (bits, is_host) = match self.current_thread {
5411            CurrentThread::Guest(id) => (id.task.rep(), false),
5412            CurrentThread::Host(id) => (id.rep(), true),
5413            CurrentThread::None => bail_bug!("current thread is not set"),
5414        };
5415        assert_eq!((bits << 1) >> 1, bits);
5416        Ok((bits << 1) | u32::from(is_host))
5417    }
5418
5419    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5420        match self.current_thread.guest() {
5421            Some(id) => Ok(*id),
5422            None => bail_bug!("current thread is not a guest thread"),
5423        }
5424    }
5425
5426    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5427        match self.current_thread.host() {
5428            Some(id) => Ok(id),
5429            None => bail_bug!("current thread is not a host thread"),
5430        }
5431    }
5432
5433    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5434        match self.futures.get_mut().as_mut() {
5435            Some(f) => Ok(f),
5436            None => bail_bug!("futures field of concurrent state is currently taken"),
5437        }
5438    }
5439
5440    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5441        self.table.get_mut()
5442    }
5443
5444    /// Returns the parent thread, if any, of `cur`.
5445    fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5446        match cur {
5447            CurrentThread::Guest(thread) => {
5448                let task = self.get_mut(thread.task).ok()?;
5449                Some(match task.caller {
5450                    Caller::Host { caller, .. } => caller,
5451                    Caller::Guest { thread } => thread.into(),
5452                })
5453            }
5454            CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5455            CurrentThread::None => None,
5456        }
5457    }
5458}
5459
5460/// Provide a type hint to compiler about the shape of a parameter lower
5461/// closure.
5462fn for_any_lower<
5463    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5464>(
5465    fun: F,
5466) -> F {
5467    fun
5468}
5469
5470/// Provide a type hint to compiler about the shape of a result lift closure.
5471fn for_any_lift<
5472    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5473>(
5474    fun: F,
5475) -> F {
5476    fun
5477}
5478
5479fn check_ambient_store(id: StoreId) {
5480    let message = "\
5481        `Future`s which depend on asynchronous component tasks, streams, or \
5482        futures to complete may only be polled from the event loop of the \
5483        store to which they belong.  Please use \
5484        `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5485    ";
5486    tls::try_get(|store| {
5487        let matched = match store {
5488            tls::TryGet::Some(store) => store.id() == id,
5489            tls::TryGet::Taken | tls::TryGet::None => false,
5490        };
5491
5492        if !matched {
5493            panic!("{message}")
5494        }
5495    });
5496}
5497
5498/// Assert that `StoreContextMut::run_concurrent` has not been called from
5499/// within an store's event loop.
5500fn check_recursive_run() {
5501    tls::try_get(|store| {
5502        if !matches!(store, tls::TryGet::None) {
5503            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5504        }
5505    });
5506}
5507
5508fn unpack_callback_code(code: u32) -> (u32, u32) {
5509    (code & 0xF, code >> 4)
5510}
5511
5512/// Helper struct for packaging parameters to be passed to
5513/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5514/// `waitable-set.poll`.
5515struct WaitableCheckParams {
5516    set: TableId<WaitableSet>,
5517    options: OptionsIndex,
5518    payload: u32,
5519}
5520
5521/// Indicates whether `ComponentInstance::waitable_check` is being called for
5522/// `waitable-set.wait` or `waitable-set.poll`.
5523enum WaitableCheck {
5524    Wait,
5525    Poll,
5526}
5527
5528/// An identifier representing a guest task within a component.
5529///
5530/// This can be acquired by calling [`Func::start_call_concurrent`] or
5531/// [`TypedFunc::start_call_concurrent`] and then using the
5532/// [`FuncCallConcurrent::task`] accessor, for example. This can then be
5533/// reflected on with [`StoreContextMut::async_call_stack`].
5534///
5535/// [`TypedFunc::start_call_concurrent`]: crate::component::TypedFunc::start_call_concurrent
5536#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5537pub struct GuestTaskId(TableId<GuestTask>);
5538
5539/// Represents a guest task called from the host, prepared using `prepare_call`.
5540pub(crate) struct PreparedCall<R> {
5541    /// The guest export to be called
5542    handle: Func,
5543    /// The guest thread created by `prepare_call`
5544    thread: QualifiedThreadId,
5545    /// The number of lowered core Wasm parameters to pass to the call.
5546    param_count: usize,
5547    /// The `oneshot::Receiver` to which the result of the call will be
5548    /// delivered when it is available.
5549    rx: oneshot::Receiver<LiftedResult>,
5550    /// The instance that this call is prepared for.
5551    runtime_instance: RuntimeInstance,
5552    _phantom: PhantomData<R>,
5553}
5554
5555impl<R> PreparedCall<R> {
5556    /// Get a copy of the `TaskId` for this `PreparedCall`.
5557    pub(crate) fn task_id(&self) -> TaskId {
5558        TaskId {
5559            task: self.thread.task,
5560            runtime_instance: self.runtime_instance,
5561        }
5562    }
5563}
5564
5565/// Represents a task created by `prepare_call`.
5566pub(crate) struct TaskId {
5567    task: TableId<GuestTask>,
5568    runtime_instance: RuntimeInstance,
5569}
5570
5571impl TaskId {
5572    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5573    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5574    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5575    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5576    /// and delete the task when all threads are done.
5577    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5578        let task = store.concurrent_state_mut().get_mut(self.task)?;
5579        let delete = if !task.already_lowered_parameters() {
5580            store.cancel_guest_subtask_without_lowered_parameters(
5581                self.runtime_instance,
5582                self.task,
5583            )?;
5584            true
5585        } else {
5586            task.host_future_state = HostFutureState::Dropped;
5587            task.ready_to_delete()
5588        };
5589        if delete {
5590            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5591        }
5592        Ok(())
5593    }
5594}
5595
5596/// Prepare a call to the specified exported Wasm function, providing functions
5597/// for lowering the parameters and lifting the result.
5598///
5599/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5600/// loop, use `queue_call`.
5601pub(crate) fn prepare_call<T, R>(
5602    mut store: StoreContextMut<T>,
5603    handle: Func,
5604    param_count: usize,
5605    host_future_present: bool,
5606    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5607    + Send
5608    + Sync
5609    + 'static,
5610    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5611    + Send
5612    + Sync
5613    + 'static,
5614) -> Result<PreparedCall<R>> {
5615    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5616
5617    let instance = handle.instance().id().get(store.0);
5618    let options = &instance.component().env_component().options[options];
5619    let ty = &instance.component().types()[ty];
5620    let async_function = ty.async_;
5621    let task_return_type = ty.results;
5622    let component_instance = raw_options.instance;
5623    let callback = options.callback.map(|i| instance.runtime_callback(i));
5624    let memory = options
5625        .memory()
5626        .map(|i| instance.runtime_memory(i))
5627        .map(SendSyncPtr::new);
5628    let string_encoding = options.string_encoding;
5629    let token = StoreToken::new(store.as_context_mut());
5630    let state = store.0.concurrent_state_mut();
5631
5632    let (tx, rx) = oneshot::channel();
5633
5634    let instance = handle.instance().runtime_instance(component_instance);
5635    let caller = state.current_thread;
5636    let thread = GuestTask::new(
5637        state,
5638        Box::new(for_any_lower(move |store, params| {
5639            lower_params(handle, token.as_context_mut(store), params)
5640        })),
5641        LiftResult {
5642            lift: Box::new(for_any_lift(move |store, result| {
5643                lift_result(handle, store, result)
5644            })),
5645            ty: task_return_type,
5646            memory,
5647            string_encoding,
5648        },
5649        Caller::Host {
5650            tx: Some(tx),
5651            host_future_present,
5652            caller,
5653        },
5654        callback.map(|callback| {
5655            let callback = SendSyncPtr::new(callback);
5656            let instance = handle.instance();
5657            Box::new(move |store: &mut dyn VMStore, event, handle| {
5658                let store = token.as_context_mut(store);
5659                // SAFETY: Per the contract of `prepare_call`, the callback
5660                // will remain valid at least as long is this task exists.
5661                unsafe { instance.call_callback(store, callback, event, handle) }
5662            }) as CallbackFn
5663        }),
5664        instance,
5665        async_function,
5666    )?;
5667
5668    if !store.0.may_enter(instance)? {
5669        bail!(Trap::CannotEnterComponent);
5670    }
5671
5672    Ok(PreparedCall {
5673        handle,
5674        thread,
5675        param_count,
5676        runtime_instance: instance,
5677        rx,
5678        _phantom: PhantomData,
5679    })
5680}
5681
5682pub(crate) struct QueuedCall<R> {
5683    store: StoreId,
5684    task: TableId<GuestTask>,
5685    rx: oneshot::Receiver<LiftedResult>,
5686    _marker: PhantomData<fn() -> R>,
5687}
5688
5689impl<R> QueuedCall<R> {
5690    /// Queue a call previously prepared using `prepare_call` to be run as part of
5691    /// the associated `ComponentInstance`'s event loop.
5692    ///
5693    /// The returned future will resolve to the result once it is available, but
5694    /// must only be polled via the instance's event loop. See
5695    /// `StoreContextMut::run_concurrent` for details.
5696    pub(crate) fn new<T: 'static>(
5697        mut store: StoreContextMut<T>,
5698        prepared: PreparedCall<R>,
5699    ) -> Result<QueuedCall<R>> {
5700        let PreparedCall {
5701            handle,
5702            thread,
5703            param_count,
5704            rx,
5705            ..
5706        } = prepared;
5707
5708        queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5709
5710        Ok(QueuedCall {
5711            store: store.0.id(),
5712            task: thread.task,
5713            rx,
5714            _marker: PhantomData,
5715        })
5716    }
5717
5718    fn task(&self) -> GuestTaskId {
5719        GuestTaskId(self.task)
5720    }
5721}
5722
5723impl<R> Future for QueuedCall<R>
5724where
5725    R: 'static,
5726{
5727    type Output = Result<R>;
5728
5729    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5730        check_ambient_store(self.store);
5731        Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5732            Ok(r) => match r.downcast() {
5733                Ok(r) => Ok(*r),
5734                Err(_) => bail_bug!("wrong type of value produced"),
5735            },
5736            Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5737        })
5738    }
5739}
5740
5741/// Queue a call previously prepared using `prepare_call` to be run as part of
5742/// the associated `ComponentInstance`'s event loop.
5743fn queue_call0<T: 'static>(
5744    store: StoreContextMut<T>,
5745    handle: Func,
5746    guest_thread: QualifiedThreadId,
5747    param_count: usize,
5748) -> Result<()> {
5749    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5750    let is_concurrent = raw_options.async_;
5751    let callback = raw_options.callback;
5752    let instance = handle.instance();
5753    let callee = handle.lifted_core_func(store.0);
5754    let post_return = handle.post_return_core_func(store.0);
5755    let callback = callback.map(|i| {
5756        let instance = instance.id().get(store.0);
5757        SendSyncPtr::new(instance.runtime_callback(i))
5758    });
5759
5760    log::trace!("queueing call {guest_thread:?}");
5761
5762    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5763    // (with signatures appropriate for this call) and will remain valid as
5764    // long as this instance is valid.
5765    unsafe {
5766        instance.queue_call(
5767            store,
5768            guest_thread,
5769            SendSyncPtr::new(callee),
5770            param_count,
5771            1,
5772            is_concurrent,
5773            callback,
5774            post_return.map(SendSyncPtr::new),
5775        )
5776    }
5777}