Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61use crate::prelude::*;
62use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
63use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
64use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
65use crate::{
66    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
67};
68use alloc::borrow::ToOwned;
69use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
70use core::any::Any;
71use core::cell::UnsafeCell;
72use core::fmt;
73use core::future;
74use core::future::Future;
75use core::marker::PhantomData;
76use core::mem::{self, ManuallyDrop, MaybeUninit};
77use core::ops::DerefMut;
78use core::pin::{Pin, pin};
79use core::ptr::{self, NonNull};
80use core::task::{Context, Poll, Waker};
81use futures::channel::oneshot;
82use futures::stream::{FuturesUnordered, StreamExt};
83use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
87    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
89    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
90    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
91};
92use wasmtime_environ::packed_option::ReservedValue;
93use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
94
95pub use abort::JoinHandle;
96pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod func;
108mod future_stream_any;
109mod futures_and_streams;
110pub(crate) mod table;
111pub(crate) mod tls;
112
113/// Constant defined in the Component Model spec to indicate that the async
114/// intrinsic (e.g. `future.write`) has not yet completed.
115const BLOCKED: u32 = 0xffff_ffff;
116
117/// Corresponds to `CallState` in the upstream spec.
118#[derive(Clone, Copy, Eq, PartialEq, Debug)]
119pub enum Status {
120    Starting = 0,
121    Started = 1,
122    Returned = 2,
123    StartCancelled = 3,
124    ReturnCancelled = 4,
125}
126
127impl Status {
128    /// Packs this status and the optional `waitable` provided into a 32-bit
129    /// result that the canonical ABI requires.
130    ///
131    /// The low 4 bits are reserved for the status while the upper 28 bits are
132    /// the waitable, if present.
133    pub fn pack(self, waitable: Option<u32>) -> u32 {
134        assert!(matches!(self, Status::Returned) == waitable.is_none());
135        let waitable = waitable.unwrap_or(0);
136        assert!(waitable < (1 << 28));
137        (waitable << 4) | (self as u32)
138    }
139}
140
141/// Corresponds to `EventCode` in the Component Model spec, plus related payload
142/// data.
143#[derive(Clone, Copy, Debug)]
144enum Event {
145    None,
146    Subtask {
147        status: Status,
148    },
149    StreamRead {
150        code: ReturnCode,
151        pending: Option<(TypeStreamTableIndex, u32)>,
152    },
153    StreamWrite {
154        code: ReturnCode,
155        pending: Option<(TypeStreamTableIndex, u32)>,
156    },
157    FutureRead {
158        code: ReturnCode,
159        pending: Option<(TypeFutureTableIndex, u32)>,
160    },
161    FutureWrite {
162        code: ReturnCode,
163        pending: Option<(TypeFutureTableIndex, u32)>,
164    },
165    Cancelled,
166}
167
168impl Event {
169    /// Lower this event to core Wasm integers for delivery to the guest.
170    ///
171    /// Note that the waitable handle, if any, is assumed to be lowered
172    /// separately.
173    fn parts(self) -> (u32, u32) {
174        const EVENT_NONE: u32 = 0;
175        const EVENT_SUBTASK: u32 = 1;
176        const EVENT_STREAM_READ: u32 = 2;
177        const EVENT_STREAM_WRITE: u32 = 3;
178        const EVENT_FUTURE_READ: u32 = 4;
179        const EVENT_FUTURE_WRITE: u32 = 5;
180        const EVENT_CANCELLED: u32 = 6;
181        match self {
182            Event::None => (EVENT_NONE, 0),
183            Event::Cancelled => (EVENT_CANCELLED, 0),
184            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189        }
190    }
191}
192
193/// Corresponds to `CallbackCode` in the spec.
194mod callback_code {
195    pub const EXIT: u32 = 0;
196    pub const YIELD: u32 = 1;
197    pub const WAIT: u32 = 2;
198}
199
200/// A flag indicating that the callee is an async-lowered export.
201///
202/// This may be passed to the `async-start` intrinsic from a fused adapter.
203const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204
205/// Provides access to either store data (via the `get` method) or the store
206/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207/// instance to which the current host task belongs.
208///
209/// See [`Accessor::with`] for details.
210pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211    store: StoreContextMut<'a, T>,
212    get_data: fn(&mut T) -> D::Data<'_>,
213}
214
215impl<'a, T, D> Access<'a, T, D>
216where
217    D: HasData + ?Sized,
218    T: 'static,
219{
220    /// Creates a new [`Access`] from its component parts.
221    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222        Self { store, get_data }
223    }
224
225    /// Get mutable access to the store data.
226    pub fn data_mut(&mut self) -> &mut T {
227        self.store.data_mut()
228    }
229
230    /// Get mutable access to the store data.
231    pub fn get(&mut self) -> D::Data<'_> {
232        (self.get_data)(self.data_mut())
233    }
234
235    /// Spawn a background task.
236    ///
237    /// See [`Accessor::spawn`] for details.
238    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239    where
240        T: 'static,
241    {
242        let accessor = Accessor {
243            get_data: self.get_data,
244            token: StoreToken::new(self.store.as_context_mut()),
245        };
246        self.store
247            .as_context_mut()
248            .spawn_with_accessor(accessor, task)
249    }
250
251    /// Returns the getter this accessor is using to project from `T` into
252    /// `D::Data`.
253    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254        self.get_data
255    }
256}
257
258impl<'a, T, D> AsContext for Access<'a, T, D>
259where
260    D: HasData + ?Sized,
261    T: 'static,
262{
263    type Data = T;
264
265    fn as_context(&self) -> StoreContext<'_, T> {
266        self.store.as_context()
267    }
268}
269
270impl<'a, T, D> AsContextMut for Access<'a, T, D>
271where
272    D: HasData + ?Sized,
273    T: 'static,
274{
275    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276        self.store.as_context_mut()
277    }
278}
279
280/// Provides scoped mutable access to store data in the context of a concurrent
281/// host task future.
282///
283/// This allows multiple host task futures to execute concurrently and access
284/// the store between (but not across) `await` points.
285///
286/// # Rationale
287///
288/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289/// `D::Data<'_>`. The problem this is solving, however, is that it does not
290/// literally store these values. The basic problem is that when a concurrent
291/// host future is being polled it has access to `&mut T` (and the whole
292/// `Store`) but when it's not being polled it does not have access to these
293/// values. This reflects how the store is only ever polling one future at a
294/// time so the store is effectively being passed between futures.
295///
296/// Rust's `Future` trait, however, has no means of passing a `Store`
297/// temporarily between futures. The [`Context`](core::task::Context) type does
298/// not have the ability to attach arbitrary information to it at this time.
299/// This type, [`Accessor`], is used to bridge this expressivity gap.
300///
301/// The [`Accessor`] type here represents the ability to acquire, temporarily in
302/// a synchronous manner, the current store. The [`Accessor::with`] function
303/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305/// not take an `async` closure as its argument, instead it's a synchronous
306/// closure which must complete during on run of `Future::poll`. This reflects
307/// how the store is temporarily made available while a host future is being
308/// polled.
309///
310/// # Implementation
311///
312/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313/// this type additionally doesn't even have a lifetime parameter. This is
314/// instead a representation of proof of the ability to acquire these while a
315/// future is being polled. Wasmtime will, when it polls a host future,
316/// configure ambient state such that the `Accessor` that a future closes over
317/// will work and be able to access the store.
318///
319/// This has a number of implications for users such as:
320///
321/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322///   the lifetime of a single future.
323/// * A future is expected to, however, close over an `Accessor` and keep it
324///   alive probably for the duration of the entire future.
325/// * Different host futures will be given different `Accessor`s, and that's
326///   intentional.
327/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328///   alleviates some otherwise required bounds to be written down.
329///
330/// # Using `Accessor` in `Drop`
331///
332/// The methods on `Accessor` are only expected to work in the context of
333/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334/// host future can be dropped at any time throughout the system and Wasmtime
335/// store context is not necessarily available at that time. It's recommended to
336/// not use `Accessor` methods in anything connected to a `Drop` implementation
337/// as they will panic and have unintended results. If you run into this though
338/// feel free to file an issue on the Wasmtime repository.
339pub struct Accessor<T: 'static, D = HasSelf<T>>
340where
341    D: HasData + ?Sized,
342{
343    token: StoreToken<T>,
344    get_data: fn(&mut T) -> D::Data<'_>,
345}
346
347/// A helper trait to take any type of accessor-with-data in functions.
348///
349/// This trait is similar to [`AsContextMut`] except that it's used when
350/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351/// [`Accessor`] is the main type used in concurrent settings and is passed to
352/// functions such as [`Func::call_concurrent`].
353///
354/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355/// this trait. This effectively means that regardless of the `D` in
356/// `Accessor<T, D>` it can still be passed to a function which just needs a
357/// store accessor.
358///
359/// Acquiring an [`Accessor`] can be done through
360/// [`StoreContextMut::run_concurrent`] for example or in a host function
361/// through
362/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
363pub trait AsAccessor {
364    /// The `T` in `Store<T>` that this accessor refers to.
365    type Data: 'static;
366
367    /// The `D` in `Accessor<T, D>`, or the projection out of
368    /// `Self::Data`.
369    type AccessorData: HasData + ?Sized;
370
371    /// Returns the accessor that this is referring to.
372    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373}
374
375impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376    type Data = T::Data;
377    type AccessorData = T::AccessorData;
378
379    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380        T::as_accessor(self)
381    }
382}
383
384impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385    type Data = T;
386    type AccessorData = D;
387
388    fn as_accessor(&self) -> &Accessor<T, D> {
389        self
390    }
391}
392
393// Note that it is intentional at this time that `Accessor` does not actually
394// store `&mut T` or anything similar. This distinctly enables the `Accessor`
395// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396// that matter). This is used to ergonomically simplify bindings where the
397// majority of the time `Accessor` is closed over in a future which then needs
398// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399// you already have to write `T: 'static`...) it helps to avoid this.
400//
401// Note as well that `Accessor` doesn't actually store its data at all. Instead
402// it's more of a "proof" of what can be accessed from TLS. API design around
403// `Accessor` and functions like `Linker::func_wrap_concurrent` are
404// intentionally made to ensure that `Accessor` is ideally only used in the
405// context that TLS variables are actually set. For example host functions are
406// given `&Accessor`, not `Accessor`, and this prevents them from persisting
407// the value outside of a future. Within the future the TLS variables are all
408// guaranteed to be set while the future is being polled.
409//
410// Finally though this is not an ironclad guarantee, but nor does it need to be.
411// The TLS APIs are designed to panic or otherwise model usage where they're
412// called recursively or similar. It's hoped that code cannot be constructed to
413// actually hit this at runtime but this is not a safety requirement at this
414// time.
415const _: () = {
416    const fn assert<T: Send + Sync>() {}
417    assert::<Accessor<UnsafeCell<u32>>>();
418};
419
420impl<T> Accessor<T> {
421    /// Creates a new `Accessor` backed by the specified functions.
422    ///
423    /// - `get`: used to retrieve the store
424    ///
425    /// - `get_data`: used to "project" from the store's associated data to
426    /// another type (e.g. a field of that data or a wrapper around it).
427    ///
428    /// - `spawn`: used to queue spawned background tasks to be run later
429    pub(crate) fn new(token: StoreToken<T>) -> Self {
430        Self {
431            token,
432            get_data: |x| x,
433        }
434    }
435}
436
437impl<T, D> Accessor<T, D>
438where
439    D: HasData + ?Sized,
440{
441    /// Run the specified closure, passing it mutable access to the store.
442    ///
443    /// This function is one of the main building blocks of the [`Accessor`]
444    /// type. This yields synchronous, blocking, access to the store via an
445    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446    /// providing the ability to access `D` via [`Access::get`]. Note that the
447    /// `fun` here is given only temporary access to the store and `T`/`D`
448    /// meaning that the return value `R` here is not allowed to capture borrows
449    /// into the two. If access is needed to data within `T` or `D` outside of
450    /// this closure then it must be `clone`d out, for example.
451    ///
452    /// # Panics
453    ///
454    /// This function will panic if it is call recursively with any other
455    /// accessor already in scope. For example if `with` is called within `fun`,
456    /// then this function will panic. It is up to the embedder to ensure that
457    /// this does not happen.
458    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459        tls::get(|vmstore| {
460            fun(Access {
461                store: self.token.as_context_mut(vmstore),
462                get_data: self.get_data,
463            })
464        })
465    }
466
467    /// Returns the getter this accessor is using to project from `T` into
468    /// `D::Data`.
469    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470        self.get_data
471    }
472
473    /// Changes this accessor to access `D2` instead of the current type
474    /// parameter `D`.
475    ///
476    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477    ///
478    /// # Panics
479    ///
480    /// When using this API the returned value is disconnected from `&self` and
481    /// the lifetime binding the `self` argument. An `Accessor` only works
482    /// within the context of the closure or async closure that it was
483    /// originally given to, however. This means that due to the fact that the
484    /// returned value has no lifetime connection it's possible to use the
485    /// accessor outside of `&self`, the original accessor, and panic.
486    ///
487    /// The returned value should only be used within the scope of the original
488    /// `Accessor` that `self` refers to.
489    pub fn with_getter<D2: HasData>(
490        &self,
491        get_data: fn(&mut T) -> D2::Data<'_>,
492    ) -> Accessor<T, D2> {
493        Accessor {
494            token: self.token,
495            get_data,
496        }
497    }
498
499    /// Spawn a background task which will receive an `&Accessor<T, D>` and
500    /// run concurrently with any other tasks in progress for the current
501    /// store.
502    ///
503    /// This is particularly useful for host functions which return a `stream`
504    /// or `future` such that the code to write to the write end of that
505    /// `stream` or `future` must run after the function returns.
506    ///
507    /// The returned [`JoinHandle`] may be used to cancel the task.
508    ///
509    /// # Panics
510    ///
511    /// Panics if called within a closure provided to the [`Accessor::with`]
512    /// function. This can only be called outside an active invocation of
513    /// [`Accessor::with`].
514    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515    where
516        T: 'static,
517    {
518        let accessor = self.clone_for_spawn();
519        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520    }
521
522    fn clone_for_spawn(&self) -> Self {
523        Self {
524            token: self.token,
525            get_data: self.get_data,
526        }
527    }
528
529    /// Polls to see if this store contains any "interesting" tasks still within
530    /// it.
531    ///
532    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
533    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
534    /// the last remaining "interesting" task has exited the provided context's
535    /// waker will be notified. Note that only the waker passed to the last call
536    /// to `poll_no_interesting_tasks` will be notified, so this is only
537    /// appropriate to use one-at-a-time.
538    ///
539    /// The component model specification, as of this current date, does not
540    /// have a distinction between "interesting" tasks and not. The current
541    /// intention is that in a future revision of the component model this will
542    /// be distinguished at the component ABI level where tasks will be able to
543    /// flag themselves as "interesting" optionally. Additionally extra work can
544    /// be opted-in to being "interesting".
545    ///
546    /// For now what this means is that all component model tasks within this
547    /// store are considered interesting. This specifically includes the entire
548    /// duration of a task, so even all of the time after a task has returned
549    /// but before it has exited. This means that this function is, today,
550    /// effectively a proxy for "are there any more tasks still running in this
551    /// store". This can be used by embedders to determine whether there's any
552    /// more work going on, even in the background, for a particular guest.
553    /// Hosts can use this as a signal that the guest wants to stay alive a
554    /// little longer, even after a task has returned.
555    ///
556    /// In the future this predicate won't include all tasks in this store. Some
557    /// tasks will be able to flag themselves as not interesting, meaning that
558    /// when this returns ready it'd be possible that there are still tasks
559    /// remaining in the store.
560    ///
561    /// Note that at this time spawned threads within a task are always
562    /// considered uninteresting. If this function returns ready, then spawned
563    /// threads may still be in the store.
564    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
565        self.with(|mut access| {
566            let store = access.as_context_mut().0;
567            let state = store.concurrent_state_mut();
568            if state.interesting_tasks == 0 {
569                Poll::Ready(())
570            } else {
571                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
572                Poll::Pending
573            }
574        })
575    }
576}
577
578/// Represents a task which may be provided to `Accessor::spawn`,
579/// `Accessor::forward`, or `StorecContextMut::spawn`.
580// TODO: Replace this with `core::ops::AsyncFnOnce` when that becomes a viable
581// option.
582//
583// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
584// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
585// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
586// Send + Sync + 'static` fails with a type mismatch error when we try to pass
587// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
588// best we can do for the time being.
589pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
590where
591    D: HasData + ?Sized,
592{
593    /// Run the task.
594    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
595}
596
597/// Represents parameter and result metadata for the caller side of a
598/// guest->guest call orchestrated by a fused adapter.
599enum CallerInfo {
600    /// Metadata for a call to an async-lowered import
601    Async {
602        params: Vec<ValRaw>,
603        has_result: bool,
604    },
605    /// Metadata for a call to an sync-lowered import
606    Sync {
607        params: Vec<ValRaw>,
608        result_count: u32,
609    },
610}
611
612/// Indicates how a guest task is waiting on a waitable set.
613enum WaitMode {
614    /// The guest task is waiting using `task.wait`
615    Fiber(StoreFiber<'static>),
616    /// The guest task is waiting via a callback declared as part of an
617    /// async-lifted export.
618    Callback(Instance),
619}
620
621/// Represents the reason a fiber is suspending itself.
622#[derive(Debug)]
623enum SuspendReason {
624    /// The fiber is waiting for an event to be delivered to the specified
625    /// waitable set or task.
626    Waiting {
627        set: TableId<WaitableSet>,
628        thread: QualifiedThreadId,
629        skip_may_block_check: bool,
630    },
631    /// The fiber has finished handling its most recent work item and is waiting
632    /// for another (or to be dropped if it is no longer needed).
633    NeedWork,
634    /// The fiber is yielding and should be resumed once other tasks have had a
635    /// chance to run.
636    Yielding {
637        thread: QualifiedThreadId,
638        cancellable: bool,
639        skip_may_block_check: bool,
640    },
641    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
642    ExplicitlySuspending {
643        thread: QualifiedThreadId,
644        skip_may_block_check: bool,
645    },
646}
647
648/// Represents a pending call into guest code for a given guest task.
649enum GuestCallKind {
650    /// Indicates there's an event to deliver to the task, possibly related to a
651    /// waitable set the task has been waiting on or polling.
652    DeliverEvent {
653        /// The instance to which the task belongs.
654        instance: Instance,
655        /// The waitable set the event belongs to, if any.
656        ///
657        /// If this is `None` the event will be waiting in the
658        /// `GuestTask::event` field for the task.
659        set: Option<TableId<WaitableSet>>,
660    },
661    /// Indicates that a new guest task call is pending and may be executed
662    /// using the specified closure.
663    ///
664    /// If the closure returns `Ok(Some(call))`, the `call` should be run
665    /// immediately using `handle_guest_call`.
666    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
667    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
668}
669
670impl fmt::Debug for GuestCallKind {
671    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
672        match self {
673            Self::DeliverEvent { instance, set } => f
674                .debug_struct("DeliverEvent")
675                .field("instance", instance)
676                .field("set", set)
677                .finish(),
678            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
679            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
680        }
681    }
682}
683
684/// The target of a suspension intrinsic.
685#[derive(Copy, Clone, Debug)]
686pub enum SuspensionTarget {
687    SomeSuspended(u32),
688    Some(u32),
689    None,
690}
691
692impl SuspensionTarget {
693    fn is_none(&self) -> bool {
694        matches!(self, SuspensionTarget::None)
695    }
696    fn is_some(&self) -> bool {
697        !self.is_none()
698    }
699}
700
701/// Represents a pending call into guest code for a given guest thread.
702#[derive(Debug)]
703struct GuestCall {
704    thread: QualifiedThreadId,
705    kind: GuestCallKind,
706}
707
708impl GuestCall {
709    /// Returns whether or not the call is ready to run.
710    ///
711    /// A call will not be ready to run if either:
712    ///
713    /// - the (sub-)component instance to be called has already been entered and
714    /// cannot be reentered until an in-progress call completes
715    ///
716    /// - the call is for a not-yet started task and the (sub-)component
717    /// instance to be called has backpressure enabled
718    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
719        let instance = store
720            .concurrent_state_mut()
721            .get_mut(self.thread.task)?
722            .instance;
723        let state = store.instance_state(instance).concurrent_state();
724
725        let ready = match &self.kind {
726            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
727            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
728            GuestCallKind::StartExplicit(_) => true,
729        };
730        log::trace!(
731            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
732            state.do_not_enter,
733            state.backpressure
734        );
735        Ok(ready)
736    }
737}
738
739/// Job to be run on a worker fiber.
740enum WorkerItem {
741    GuestCall(GuestCall),
742    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
743}
744
745/// Represents a pending work item to be handled by the event loop for a given
746/// component instance.
747enum WorkItem {
748    /// A host task to be pushed to `ConcurrentState::futures`.
749    PushFuture(AlwaysMut<HostTaskFuture>),
750    /// A fiber to resume.
751    ResumeFiber(StoreFiber<'static>),
752    /// A thread to resume.
753    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
754    /// A pending call into guest code for a given guest task.
755    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
756    /// A job to run on a worker fiber.
757    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
758}
759
760impl fmt::Debug for WorkItem {
761    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
762        match self {
763            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
764            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
765            Self::ResumeThread(instance, thread) => f
766                .debug_tuple("ResumeThread")
767                .field(instance)
768                .field(thread)
769                .finish(),
770            Self::GuestCall(instance, call) => f
771                .debug_tuple("GuestCall")
772                .field(instance)
773                .field(call)
774                .finish(),
775            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
776        }
777    }
778}
779
780/// Whether a suspension intrinsic was cancelled or completed
781#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
782pub(crate) enum WaitResult {
783    Cancelled,
784    Completed,
785}
786
787/// Poll the specified future until it completes on behalf of a guest->host call
788/// using a sync-lowered import.
789///
790/// This is similar to `Instance::first_poll` except it's for sync-lowered
791/// imports, meaning we don't need to handle cancellation and we can block the
792/// caller until the task completes, at which point the caller can handle
793/// lowering the result to the guest's stack and linear memory.
794pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
795    store: &mut dyn VMStore,
796    future: impl Future<Output = Result<R>> + Send + 'static,
797) -> Result<R> {
798    let state = store.concurrent_state_mut();
799    let task = state.current_host_thread()?;
800
801    // Wrap the future in a closure which will take care of stashing the result
802    // in `GuestTask::result` and resuming this fiber when the host task
803    // completes.
804    let mut future = Box::pin(async move {
805        let result = future.await?;
806        tls::get(move |store| {
807            let state = store.concurrent_state_mut();
808            let host_state = &mut state.get_mut(task)?.state;
809            assert!(matches!(host_state, HostTaskState::CalleeStarted));
810            *host_state = HostTaskState::CalleeFinished(Box::new(result));
811
812            Waitable::Host(task).set_event(
813                state,
814                Some(Event::Subtask {
815                    status: Status::Returned,
816                }),
817            )?;
818
819            Ok(())
820        })
821    }) as HostTaskFuture;
822
823    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
824    // add the future to `ConcurrentState::futures` and poll it automatically
825    // from the event loop if it doesn't complete immediately here.
826    let poll = tls::set(store, || {
827        future
828            .as_mut()
829            .poll(&mut Context::from_waker(&Waker::noop()))
830    });
831
832    match poll {
833        // It completed immediately; check the result and delete the task.
834        Poll::Ready(result) => result?,
835
836        // It did not complete immediately; add it to
837        // `ConcurrentState::futures` so it will be polled via the event loop;
838        // then use `GuestThread::sync_call_set` to wait for the task to
839        // complete, suspending the current fiber until it does so.
840        Poll::Pending => {
841            let state = store.concurrent_state_mut();
842            state.push_future(future);
843
844            let caller = state.get_mut(task)?.caller;
845            let set = state.get_mut(caller.thread)?.sync_call_set;
846            Waitable::Host(task).join(state, Some(set))?;
847
848            store.suspend(SuspendReason::Waiting {
849                set,
850                thread: caller,
851                skip_may_block_check: false,
852            })?;
853
854            // Remove the `task` from the `sync_call_set` to ensure that when
855            // this function returns and the task is deleted that there are no
856            // more lingering references to this host task.
857            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
858        }
859    }
860
861    // Retrieve and return the result.
862    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
863    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
864        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
865            Ok(result) => *result,
866            Err(_) => bail_bug!("host task finished with wrong type of result"),
867        }),
868        _ => bail_bug!("unexpected host task state after completion"),
869    }
870}
871
872/// Execute the specified guest call.
873fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
874    let mut next = Some(call);
875    while let Some(call) = next.take() {
876        match call.kind {
877            GuestCallKind::DeliverEvent { instance, set } => {
878                let (event, waitable) =
879                    match instance.get_event(store, call.thread.task, set, true)? {
880                        Some(pair) => pair,
881                        None => bail_bug!("delivering non-present event"),
882                    };
883                let state = store.concurrent_state_mut();
884                let task = state.get_mut(call.thread.task)?;
885                let runtime_instance = task.instance;
886                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
887
888                log::trace!(
889                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
890                    call.thread,
891                );
892
893                let old_thread = store.set_thread(call.thread)?;
894                log::trace!(
895                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
896                    call.thread
897                );
898
899                store.enter_instance(runtime_instance);
900
901                let Some(callback) = store
902                    .concurrent_state_mut()
903                    .get_mut(call.thread.task)?
904                    .callback
905                    .take()
906                else {
907                    bail_bug!("guest task callback field not present")
908                };
909
910                let code = callback(store, event, handle)?;
911
912                store
913                    .concurrent_state_mut()
914                    .get_mut(call.thread.task)?
915                    .callback = Some(callback);
916
917                store.exit_instance(runtime_instance)?;
918
919                store.set_thread(old_thread)?;
920
921                next = instance.handle_callback_code(
922                    store,
923                    call.thread,
924                    runtime_instance.index,
925                    code,
926                )?;
927
928                log::trace!(
929                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
930                );
931            }
932            GuestCallKind::StartImplicit(fun) => {
933                next = fun(store)?;
934            }
935            GuestCallKind::StartExplicit(fun) => {
936                fun(store)?;
937            }
938        }
939    }
940
941    Ok(())
942}
943
944impl<T> Store<T> {
945    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
946    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
947    where
948        T: Send + 'static,
949    {
950        ensure!(
951            self.as_context().0.concurrency_support(),
952            "cannot use `run_concurrent` when Config::concurrency_support disabled",
953        );
954        self.as_context_mut().run_concurrent(fun).await
955    }
956
957    #[doc(hidden)]
958    pub fn assert_concurrent_state_empty(&mut self) {
959        self.as_context_mut().assert_concurrent_state_empty();
960    }
961
962    #[doc(hidden)]
963    pub fn concurrent_state_table_size(&mut self) -> usize {
964        self.as_context_mut().concurrent_state_table_size()
965    }
966
967    /// Convenience wrapper for [`StoreContextMut::spawn`].
968    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
969    where
970        T: 'static,
971    {
972        self.as_context_mut().spawn(task)
973    }
974}
975
976impl<T> StoreContextMut<'_, T> {
977    /// Assert that all the relevant tables and queues in the concurrent state
978    /// for this store are empty.
979    ///
980    /// This is for sanity checking in integration tests
981    /// (e.g. `component-async-tests`) that the relevant state has been cleared
982    /// after each test concludes.  This should help us catch leaks, e.g. guest
983    /// tasks which haven't been deleted despite having completed and having
984    /// been dropped by their supertasks.
985    ///
986    /// Only intended for use in Wasmtime's own testing.
987    #[doc(hidden)]
988    pub fn assert_concurrent_state_empty(self) {
989        let store = self.0;
990        store
991            .store_data_mut()
992            .components
993            .assert_instance_states_empty();
994        let state = store.concurrent_state_mut();
995        assert!(
996            state.table.get_mut().is_empty(),
997            "non-empty table: {:?}",
998            state.table.get_mut()
999        );
1000        assert!(state.high_priority.is_empty());
1001        assert!(state.low_priority.is_empty());
1002        assert!(state.current_thread.is_none());
1003        assert!(state.futures_mut().unwrap().is_empty());
1004        assert!(state.global_error_context_ref_counts.is_empty());
1005    }
1006
1007    /// Helper function to perform tests over the size of the concurrent state
1008    /// table which can be useful for detecting leaks.
1009    ///
1010    /// Only intended for use in Wasmtime's own testing.
1011    #[doc(hidden)]
1012    pub fn concurrent_state_table_size(&mut self) -> usize {
1013        self.0
1014            .concurrent_state_mut()
1015            .table
1016            .get_mut()
1017            .iter_mut()
1018            .count()
1019    }
1020
1021    /// Spawn a background task to run as part of this instance's event loop.
1022    ///
1023    /// The task will receive an `&Accessor<U>` and run concurrently with
1024    /// any other tasks in progress for the instance.
1025    ///
1026    /// Note that the task will only make progress if and when the event loop
1027    /// for this instance is run.
1028    ///
1029    /// The returned [`JoinHandle`] may be used to cancel the task.
1030    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1031    where
1032        T: 'static,
1033    {
1034        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1035        self.spawn_with_accessor(accessor, task)
1036    }
1037
1038    /// Internal implementation of `spawn` functions where a `store` is
1039    /// available along with an `Accessor`.
1040    fn spawn_with_accessor<D>(
1041        self,
1042        accessor: Accessor<T, D>,
1043        task: impl AccessorTask<T, D>,
1044    ) -> JoinHandle
1045    where
1046        T: 'static,
1047        D: HasData + ?Sized,
1048    {
1049        // Create an "abortable future" here where internally the future will
1050        // hook calls to poll and possibly spawn more background tasks on each
1051        // iteration.
1052        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1053        self.0
1054            .concurrent_state_mut()
1055            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1056        handle
1057    }
1058
1059    /// Run the specified closure `fun` to completion as part of this store's
1060    /// event loop.
1061    ///
1062    /// This will run `fun` as part of this store's event loop until it
1063    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1064    /// controlled access to the store and its data.
1065    ///
1066    /// This function can be used to invoke [`Func::call_concurrent`] for
1067    /// example within the async closure provided here.
1068    ///
1069    /// This function will unconditionally return an error if
1070    /// [`Config::concurrency_support`] is disabled.
1071    ///
1072    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1073    ///
1074    /// # Store-blocking behavior
1075    ///
1076    /// At this time there are certain situations in which the `Future` returned
1077    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1078    /// extended period of time, despite one or more `Waker::wake` events having
1079    /// occurred for the task to which it belongs.  This can manifest as the
1080    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1081    /// the `Store` being held by e.g. a blocking host function, preventing the
1082    /// `Future` from being polled. A canonical example of this is when the
1083    /// `fun` provided to this function attempts to set a timeout for an
1084    /// invocation of a wasm function. In this situation the async closure is
1085    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1086    /// to elapse. At this time this setup will not always work and the timeout
1087    /// may not reliably fire.
1088    ///
1089    /// This function will not block the current thread and as such is always
1090    /// suitable to run in an `async` context, but the current implementation of
1091    /// Wasmtime can lead to situations where a certain wasm computation is
1092    /// required to make progress the closure to make progress. This is an
1093    /// artifact of Wasmtime's historical implementation of `async` functions
1094    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1095    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1096    /// progress for a readiness notification of (b) to get delivered.
1097    ///
1098    /// This effectively means that it's not possible to reliably perform a
1099    /// "select" operation within the `fun` closure, which timeouts for example
1100    /// are based on. Fixing this requires some relatively major refactoring
1101    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1102    /// that is intended to be fixed one day. In the meantime it's recommended
1103    /// to apply timeouts or such to the entire `run_concurrent` call itself
1104    /// rather than internally.
1105    ///
1106    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1107    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1108    ///
1109    /// # Example
1110    ///
1111    /// ```
1112    /// # use {
1113    /// #   wasmtime::{
1114    /// #     error::{Result},
1115    /// #     component::{ Component, Linker, Resource, ResourceTable},
1116    /// #     Config, Engine, Store
1117    /// #   },
1118    /// # };
1119    /// #
1120    /// # struct MyResource(u32);
1121    /// # struct Ctx { table: ResourceTable }
1122    /// #
1123    /// # async fn foo() -> Result<()> {
1124    /// # let mut config = Config::new();
1125    /// # let engine = Engine::new(&config)?;
1126    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1127    /// # let mut linker = Linker::new(&engine);
1128    /// # let component = Component::new(&engine, "")?;
1129    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1130    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1131    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1132    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1133    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1134    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1135    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1136    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1137    ///    Ok(())
1138    /// }).await??;
1139    /// # Ok(())
1140    /// # }
1141    /// ```
1142    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1143    where
1144        T: Send + 'static,
1145    {
1146        ensure!(
1147            self.0.concurrency_support(),
1148            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1149        );
1150        self.do_run_concurrent(fun, false).await
1151    }
1152
1153    pub(super) async fn run_concurrent_trap_on_idle<R>(
1154        self,
1155        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1156    ) -> Result<R>
1157    where
1158        T: Send + 'static,
1159    {
1160        self.do_run_concurrent(fun, true).await
1161    }
1162
1163    async fn do_run_concurrent<R>(
1164        mut self,
1165        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1166        trap_on_idle: bool,
1167    ) -> Result<R>
1168    where
1169        T: Send + 'static,
1170    {
1171        debug_assert!(self.0.concurrency_support());
1172        check_recursive_run();
1173        let token = StoreToken::new(self.as_context_mut());
1174
1175        struct Dropper<'a, T: 'static, V> {
1176            store: StoreContextMut<'a, T>,
1177            value: ManuallyDrop<V>,
1178        }
1179
1180        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1181            fn drop(&mut self) {
1182                tls::set(self.store.0, || {
1183                    // SAFETY: Here we drop the value without moving it for the
1184                    // first and only time -- per the contract for `Drop::drop`,
1185                    // this code won't run again, and the `value` field will no
1186                    // longer be accessible.
1187                    unsafe { ManuallyDrop::drop(&mut self.value) }
1188                });
1189            }
1190        }
1191
1192        let accessor = &Accessor::new(token);
1193        let dropper = &mut Dropper {
1194            store: self,
1195            value: ManuallyDrop::new(fun(accessor)),
1196        };
1197        // SAFETY: We never move `dropper` nor its `value` field.
1198        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1199
1200        dropper
1201            .store
1202            .as_context_mut()
1203            .poll_until(future, trap_on_idle)
1204            .await
1205    }
1206
1207    /// Run this store's event loop.
1208    ///
1209    /// The returned future will resolve when the specified future completes or,
1210    /// if `trap_on_idle` is true, when the event loop can't make further
1211    /// progress.
1212    async fn poll_until<R>(
1213        mut self,
1214        mut future: Pin<&mut impl Future<Output = R>>,
1215        trap_on_idle: bool,
1216    ) -> Result<R>
1217    where
1218        T: Send + 'static,
1219    {
1220        struct Reset<'a, T: 'static> {
1221            store: StoreContextMut<'a, T>,
1222            futures: Option<FuturesUnordered<HostTaskFuture>>,
1223        }
1224
1225        impl<'a, T> Drop for Reset<'a, T> {
1226            fn drop(&mut self) {
1227                if let Some(futures) = self.futures.take() {
1228                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1229                }
1230            }
1231        }
1232
1233        loop {
1234            // Take `ConcurrentState::futures` out of the store so we can poll
1235            // it while also safely giving any of the futures inside access to
1236            // `self`.
1237            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1238            let mut reset = Reset {
1239                store: self.as_context_mut(),
1240                futures,
1241            };
1242            let mut next = match reset.futures.as_mut() {
1243                Some(f) => pin!(f.next()),
1244                None => bail_bug!("concurrent state missing futures field"),
1245            };
1246
1247            enum PollResult<R> {
1248                Complete(R),
1249                ProcessWork {
1250                    ready: Vec<WorkItem>,
1251                    low_priority: bool,
1252                },
1253            }
1254
1255            let result = future::poll_fn(|cx| {
1256                // First, poll the future we were passed as an argument and
1257                // return immediately if it's ready.
1258                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1259                    return Poll::Ready(Ok(PollResult::Complete(value)));
1260                }
1261
1262                // Next, poll `ConcurrentState::futures` (which includes any
1263                // pending host tasks and/or background tasks), returning
1264                // immediately if one of them fails.
1265                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1266                    Poll::Ready(Some(output)) => {
1267                        match output {
1268                            Err(e) => return Poll::Ready(Err(e)),
1269                            Ok(()) => {}
1270                        }
1271                        Poll::Ready(true)
1272                    }
1273                    Poll::Ready(None) => Poll::Ready(false),
1274                    Poll::Pending => Poll::Pending,
1275                };
1276
1277                // Next, collect the next batch of work items to process, if
1278                // any.  This will be either all of the high-priority work
1279                // items, or if there are none, a single low-priority work item.
1280                let state = reset.store.0.concurrent_state_mut();
1281                let mut ready = mem::take(&mut state.high_priority);
1282                let mut low_priority = false;
1283                if ready.is_empty() {
1284                    if let Some(item) = state.low_priority.pop_back() {
1285                        ready.push(item);
1286                        low_priority = true;
1287                    }
1288                }
1289                if !ready.is_empty() {
1290                    return Poll::Ready(Ok(PollResult::ProcessWork {
1291                        ready,
1292                        low_priority,
1293                    }));
1294                }
1295
1296                // Finally, if we have nothing else to do right now, determine what to do
1297                // based on whether there are any pending futures in
1298                // `ConcurrentState::futures`.
1299                return match next {
1300                    Poll::Ready(true) => {
1301                        // In this case, one of the futures in
1302                        // `ConcurrentState::futures` completed
1303                        // successfully, so we return now and continue
1304                        // the outer loop in case there is another one
1305                        // ready to complete.
1306                        Poll::Ready(Ok(PollResult::ProcessWork {
1307                            ready: Vec::new(),
1308                            low_priority: false,
1309                        }))
1310                    }
1311                    Poll::Ready(false) => {
1312                        // Poll the future we were passed one last time
1313                        // in case one of `ConcurrentState::futures` had
1314                        // the side effect of unblocking it.
1315                        if let Poll::Ready(value) =
1316                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1317                        {
1318                            Poll::Ready(Ok(PollResult::Complete(value)))
1319                        } else {
1320                            // In this case, there are no more pending
1321                            // futures in `ConcurrentState::futures`,
1322                            // there are no remaining work items, _and_
1323                            // the future we were passed as an argument
1324                            // still hasn't completed.
1325                            if trap_on_idle {
1326                                // `trap_on_idle` is true, so we exit
1327                                // immediately.
1328                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1329                            } else {
1330                                // `trap_on_idle` is false, so we assume
1331                                // that future will wake up and give us
1332                                // more work to do when it's ready to.
1333                                Poll::Pending
1334                            }
1335                        }
1336                    }
1337                    // There is at least one pending future in
1338                    // `ConcurrentState::futures` and we have nothing
1339                    // else to do but wait for now, so we return
1340                    // `Pending`.
1341                    Poll::Pending => Poll::Pending,
1342                };
1343            })
1344            .await;
1345
1346            // Put the `ConcurrentState::futures` back into the store before we
1347            // return or handle any work items since one or more of those items
1348            // might append more futures.
1349            drop(reset);
1350
1351            match result? {
1352                // The future we were passed as an argument completed, so we
1353                // return the result.
1354                PollResult::Complete(value) => break Ok(value),
1355                // The future we were passed has not yet completed, so handle
1356                // any work items and then loop again.
1357                PollResult::ProcessWork {
1358                    ready,
1359                    low_priority,
1360                } => {
1361                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1362                        store: StoreContextMut<'a, T>,
1363                        ready: I,
1364                    }
1365
1366                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1367                        fn drop(&mut self) {
1368                            while let Some(item) = self.ready.next() {
1369                                match item {
1370                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1371                                    WorkItem::PushFuture(future) => {
1372                                        tls::set(self.store.0, move || drop(future))
1373                                    }
1374                                    _ => {}
1375                                }
1376                            }
1377                        }
1378                    }
1379
1380                    let mut dispose = Dispose {
1381                        store: self.as_context_mut(),
1382                        ready: ready.into_iter(),
1383                    };
1384
1385                    // If we're about to run a low-priority task, first yield to
1386                    // the executor.  This ensures that it won't be starved of
1387                    // the ability to e.g. update the readiness of sockets,
1388                    // etc. which the guest may be using `thread.yield` along
1389                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1390                    //
1391                    // This works for e.g. `thread.yield` and callbacks which
1392                    // return `CALLBACK_CODE_YIELD` because we queue a low
1393                    // priority item to resume the task (i.e. resume the thread
1394                    // or call the callback, respectively) just prior to
1395                    // suspending it.  Indeed, as of this writing those are the
1396                    // _only_ situations we queue low-priority tasks.
1397                    // Therefore, we interpret the guest's request to yield as
1398                    // meaning "yield to other guest tasks _and_/_or_ host
1399                    // operations such as updating socket readiness", the latter
1400                    // being the async runtime's responsibility.
1401                    //
1402                    // In the future, if this ends up causing measurable
1403                    // performance issues, this could be optimized such that we
1404                    // only yield periodically (e.g. for batches of low priority
1405                    // items) and not for each and every idividual item.
1406                    if low_priority {
1407                        dispose.store.0.yield_now().await
1408                    }
1409
1410                    while let Some(item) = dispose.ready.next() {
1411                        dispose
1412                            .store
1413                            .as_context_mut()
1414                            .handle_work_item(item)
1415                            .await?;
1416                    }
1417                }
1418            }
1419        }
1420    }
1421
1422    /// Handle the specified work item, possibly resuming a fiber if applicable.
1423    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1424    where
1425        T: Send,
1426    {
1427        log::trace!("handle work item {item:?}");
1428        match item {
1429            WorkItem::PushFuture(future) => {
1430                self.0
1431                    .concurrent_state_mut()
1432                    .futures_mut()?
1433                    .push(future.into_inner());
1434            }
1435            WorkItem::ResumeFiber(fiber) => {
1436                self.0.resume_fiber(fiber).await?;
1437            }
1438            WorkItem::ResumeThread(_, thread) => {
1439                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1440                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1441                    GuestThreadState::Running,
1442                ) {
1443                    self.0.resume_fiber(fiber).await?;
1444                } else {
1445                    bail_bug!("cannot resume non-pending thread {thread:?}");
1446                }
1447            }
1448            WorkItem::GuestCall(_, call) => {
1449                if call.is_ready(self.0)? {
1450                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1451                } else {
1452                    let state = self.0.concurrent_state_mut();
1453                    let task = state.get_mut(call.thread.task)?;
1454                    if !task.starting_sent {
1455                        task.starting_sent = true;
1456                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1457                            Waitable::Guest(call.thread.task).set_event(
1458                                state,
1459                                Some(Event::Subtask {
1460                                    status: Status::Starting,
1461                                }),
1462                            )?;
1463                        }
1464                    }
1465
1466                    let instance = state.get_mut(call.thread.task)?.instance;
1467                    self.0
1468                        .instance_state(instance)
1469                        .concurrent_state()
1470                        .pending
1471                        .insert(call.thread, call.kind);
1472                }
1473            }
1474            WorkItem::WorkerFunction(fun) => {
1475                self.run_on_worker(WorkerItem::Function(fun)).await?;
1476            }
1477        }
1478
1479        Ok(())
1480    }
1481
1482    /// Execute the specified guest call on a worker fiber.
1483    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1484    where
1485        T: Send,
1486    {
1487        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1488            fiber
1489        } else {
1490            fiber::make_fiber(self.0, move |store| {
1491                loop {
1492                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1493                        bail_bug!("worker_item not present when resuming fiber")
1494                    };
1495                    match item {
1496                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1497                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1498                    }
1499
1500                    store.suspend(SuspendReason::NeedWork)?;
1501                }
1502            })?
1503        };
1504
1505        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1506        assert!(worker_item.is_none());
1507        *worker_item = Some(item);
1508
1509        self.0.resume_fiber(worker).await
1510    }
1511
1512    /// Wrap the specified host function in a future which will call it, passing
1513    /// it an `&Accessor<T>`.
1514    ///
1515    /// See the `Accessor` documentation for details.
1516    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1517    where
1518        T: 'static,
1519        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1520            + Send
1521            + Sync
1522            + 'static,
1523        R: Send + Sync + 'static,
1524    {
1525        let token = StoreToken::new(self);
1526        async move {
1527            let mut accessor = Accessor::new(token);
1528            closure(&mut accessor).await
1529        }
1530    }
1531
1532    /// Returns an iterator over the current async call stack defined by the
1533    /// component model.
1534    ///
1535    /// This can be used, for example to correlate a host import call with which
1536    /// root export task originally called it.
1537    ///
1538    /// Tasks are yielded "youngest first" where the first item in the iterator
1539    /// is the current task, and the last item in the iterator is the original
1540    /// call.
1541    pub fn async_call_stack(&mut self) -> impl Iterator<Item = GuestTaskId> {
1542        let state = self.0.concurrent_state_mut();
1543        let mut cur = Some(state.current_thread);
1544        core::iter::from_fn(move || {
1545            while let Some(t) = cur {
1546                cur = state.parent(t);
1547                if let Some(thread) = t.guest() {
1548                    return Some(GuestTaskId(thread.task));
1549                }
1550            }
1551
1552            None
1553        })
1554    }
1555}
1556
1557impl StoreOpaque {
1558    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1559    /// guest-to-guest call or a sync host-to-guest call.
1560    ///
1561    /// This task will only be used for the purpose of handling calls to
1562    /// intrinsic functions; both parameter lowering and result lifting are
1563    /// assumed to be taken care of elsewhere.
1564    pub(crate) fn enter_guest_sync_call(
1565        &mut self,
1566        guest_caller: Option<RuntimeInstance>,
1567        callee_async: bool,
1568        callee: RuntimeInstance,
1569    ) -> Result<()> {
1570        log::trace!("enter sync call {callee:?}");
1571        if !self.concurrency_support() {
1572            return self.enter_call_not_concurrent();
1573        }
1574
1575        let state = self.concurrent_state_mut();
1576        let thread = state.current_thread;
1577        let instance = if let Some(thread) = thread.guest() {
1578            Some(state.get_mut(thread.task)?.instance)
1579        } else {
1580            None
1581        };
1582        if guest_caller.is_some() {
1583            debug_assert_eq!(instance, guest_caller);
1584        }
1585        let guest_thread = GuestTask::new(
1586            state,
1587            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1588            LiftResult {
1589                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1590                ty: TypeTupleIndex::reserved_value(),
1591                memory: None,
1592                string_encoding: StringEncoding::Utf8,
1593            },
1594            if let Some(thread) = thread.guest() {
1595                Caller::Guest { thread: *thread }
1596            } else {
1597                Caller::Host {
1598                    tx: None,
1599                    host_future_present: false,
1600                    caller: thread,
1601                }
1602            },
1603            None,
1604            callee,
1605            callee_async,
1606        )?;
1607
1608        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1609            guest_thread.thread,
1610            self,
1611            callee.index,
1612        )?;
1613        self.set_thread(guest_thread)?;
1614
1615        Ok(())
1616    }
1617
1618    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1619    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1620        if !self.concurrency_support() {
1621            return Ok(self.exit_call_not_concurrent());
1622        }
1623        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1624            Some(t) => *t,
1625            None => bail_bug!("expected task when exiting"),
1626        };
1627        let task = self.concurrent_state_mut().get_mut(thread.task)?;
1628        let instance = task.instance;
1629        let caller = match &task.caller {
1630            &Caller::Guest { thread } => thread.into(),
1631            &Caller::Host { caller, .. } => caller,
1632        };
1633        task.lift_result = None;
1634        task.exited = true;
1635        self.set_thread(caller)?;
1636
1637        log::trace!("exit sync call {instance:?}");
1638        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1639
1640        Ok(())
1641    }
1642
1643    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1644    /// transition to the host.
1645    ///
1646    /// FIXME: this is called for all guest->host transitions and performs some
1647    /// relatively expensive table manipulations. This would ideally be
1648    /// optimized to avoid the full allocation of a `HostTask` in at least some
1649    /// situations.
1650    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1651        if !self.concurrency_support() {
1652            self.enter_call_not_concurrent()?;
1653            return Ok(None);
1654        }
1655        let state = self.concurrent_state_mut();
1656        let caller = state.current_guest_thread()?;
1657        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1658        log::trace!("new host task {task:?}");
1659        self.set_thread(task)?;
1660        Ok(Some(task))
1661    }
1662
1663    /// Invoked before lowering the results of a host task to the guest.
1664    ///
1665    /// This is used to update the current thread annotations within the store
1666    /// to ensure that it reflects the guest task, not the host task, since
1667    /// lowering may execute guest code.
1668    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1669        if !self.concurrency_support() {
1670            return Ok(());
1671        }
1672        let task = self.concurrent_state_mut().current_host_thread()?;
1673        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1674        self.set_thread(caller)?;
1675        Ok(())
1676    }
1677
1678    /// Dual of `host_task_create` and signifies that the host has finished and
1679    /// will be cleaned up.
1680    ///
1681    /// Note that this isn't invoked when the host is invoked asynchronously and
1682    /// the host isn't complete yet. In that situation the host task persists
1683    /// and will be cleaned up separately in `subtask_drop`
1684    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1685        match task {
1686            Some(task) => {
1687                log::trace!("delete host task {task:?}");
1688                self.concurrent_state_mut().delete(task)?;
1689            }
1690            None => {
1691                self.exit_call_not_concurrent();
1692            }
1693        }
1694        Ok(())
1695    }
1696
1697    /// Determine whether the specified instance may be entered from the host.
1698    ///
1699    /// We return `true` here only if all of the following hold:
1700    ///
1701    /// - The top-level instance is not already on the current task's call stack.
1702    /// - The instance is not in need of a post-return function call.
1703    /// - `self` has not been poisoned due to a trap.
1704    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1705        if self.trapped() {
1706            return Ok(false);
1707        }
1708        if !self.concurrency_support() {
1709            return Ok(true);
1710        }
1711        let state = self.concurrent_state_mut();
1712        let mut cur = Some(state.current_thread);
1713        while let Some(t) = cur {
1714            if let Some(thread) = t.guest() {
1715                let task = state.get_mut(thread.task)?;
1716                // Note that we only compare top-level instance IDs here.
1717                // The idea is that the host is not allowed to recursively
1718                // enter a top-level instance even if the specific leaf
1719                // instance is not on the stack. This the behavior defined
1720                // in the spec, and it allows us to elide runtime checks in
1721                // guest-to-guest adapters.
1722                if task.instance.instance == instance.instance {
1723                    return Ok(false);
1724                }
1725            }
1726            cur = state.parent(t);
1727        }
1728        Ok(true)
1729    }
1730
1731    /// Helper function to retrieve the `InstanceState` for the
1732    /// specified instance.
1733    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1734        self.component_instance_mut(instance.instance)
1735            .instance_state(instance.index)
1736    }
1737
1738    /// Configure the currently running `thread`.
1739    ///
1740    /// This will save off any state necessary for the previous thread, if
1741    /// applicable, and then it'll additionally update state for `thread` if
1742    /// needed too.
1743    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1744        let thread = thread.into();
1745        let state = self.concurrent_state_mut();
1746        let old_thread = mem::replace(&mut state.current_thread, thread);
1747
1748        // First thing to do after swapping threads is updating the context
1749        // slots for this thread within the store. This restores the behavior of
1750        // `context.{get,set}`. This involves taking the old state out of the
1751        // store, saving it in the thread that's being swapped from, and doing
1752        // the inverse for the new thread. When debug assertions are enabled
1753        // this also leaves behind sentinel values to try to uncover bugs where
1754        // this may be forgotten.
1755        if let Some(old_thread) = old_thread.guest() {
1756            let old_context = self.vm_store_context().component_context;
1757            self.concurrent_state_mut()
1758                .get_mut(old_thread.thread)?
1759                .context = old_context;
1760        }
1761        if cfg!(debug_assertions) {
1762            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1763        }
1764        if let Some(thread) = thread.guest() {
1765            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1766            let context = thread.context;
1767            if cfg!(debug_assertions) {
1768                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1769            }
1770            self.vm_store_context_mut().component_context = context;
1771        }
1772
1773        // Each time we switch threads, we conservatively set `task_may_block`
1774        // to `false` for the component instance we're switching away from (if
1775        // any), meaning it will be `false` for any new thread created for that
1776        // instance unless explicitly set otherwise.
1777        //
1778        // Additionally if we're switching to a new thread, set its component
1779        // instance's `task_may_block` according to where it left off.
1780        let state = self.concurrent_state_mut();
1781        if let Some(old_thread) = old_thread.guest() {
1782            let instance = state.get_mut(old_thread.task)?.instance.instance;
1783            self.component_instance_mut(instance)
1784                .set_task_may_block(false)
1785        }
1786
1787        if thread.guest().is_some() {
1788            self.set_task_may_block()?;
1789        }
1790
1791        Ok(old_thread)
1792    }
1793
1794    /// Set the global variable representing whether the current task may block
1795    /// prior to entering Wasm code.
1796    fn set_task_may_block(&mut self) -> Result<()> {
1797        let state = self.concurrent_state_mut();
1798        let guest_thread = state.current_guest_thread()?;
1799        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1800        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1801        self.component_instance_mut(instance)
1802            .set_task_may_block(may_block);
1803        Ok(())
1804    }
1805
1806    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1807        if !self.concurrency_support() {
1808            return Ok(());
1809        }
1810        let state = self.concurrent_state_mut();
1811        let task = state.current_guest_thread()?.task;
1812        let instance = state.get_mut(task)?.instance.instance;
1813        let task_may_block = self.component_instance(instance).get_task_may_block();
1814
1815        if task_may_block {
1816            Ok(())
1817        } else {
1818            Err(Trap::CannotBlockSyncTask.into())
1819        }
1820    }
1821
1822    /// Record that we're about to enter a (sub-)component instance which does
1823    /// not support more than one concurrent, stackful activation, meaning it
1824    /// cannot be entered again until the next call returns.
1825    fn enter_instance(&mut self, instance: RuntimeInstance) {
1826        log::trace!("enter {instance:?}");
1827        self.instance_state(instance)
1828            .concurrent_state()
1829            .do_not_enter = true;
1830    }
1831
1832    /// Record that we've exited a (sub-)component instance previously entered
1833    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1834    /// See the documentation for the latter for details.
1835    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1836        log::trace!("exit {instance:?}");
1837        self.instance_state(instance)
1838            .concurrent_state()
1839            .do_not_enter = false;
1840        self.partition_pending(instance)
1841    }
1842
1843    /// Iterate over `InstanceState::pending`, moving any ready items into the
1844    /// "high priority" work item queue.
1845    ///
1846    /// See `GuestCall::is_ready` for details.
1847    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1848        for (thread, kind) in
1849            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1850        {
1851            let call = GuestCall { thread, kind };
1852            if call.is_ready(self)? {
1853                self.concurrent_state_mut()
1854                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1855            } else {
1856                self.instance_state(instance)
1857                    .concurrent_state()
1858                    .pending
1859                    .insert(call.thread, call.kind);
1860            }
1861        }
1862
1863        Ok(())
1864    }
1865
1866    /// Implements the `backpressure.{inc,dec}` intrinsics.
1867    pub(crate) fn backpressure_modify(
1868        &mut self,
1869        caller_instance: RuntimeInstance,
1870        modify: impl FnOnce(u16) -> Option<u16>,
1871    ) -> Result<()> {
1872        let state = self.instance_state(caller_instance).concurrent_state();
1873        let old = state.backpressure;
1874        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1875        state.backpressure = new;
1876
1877        if old > 0 && new == 0 {
1878            // Backpressure was previously enabled and is now disabled; move any
1879            // newly-eligible guest calls to the "high priority" queue.
1880            self.partition_pending(caller_instance)?;
1881        }
1882
1883        Ok(())
1884    }
1885
1886    /// Resume the specified fiber, giving it exclusive access to the specified
1887    /// store.
1888    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1889        let old_thread = self.concurrent_state_mut().current_thread;
1890        log::trace!("resume_fiber: save current thread {old_thread:?}");
1891
1892        let fiber = fiber::resolve_or_release(self, fiber).await?;
1893
1894        self.set_thread(old_thread)?;
1895
1896        let state = self.concurrent_state_mut();
1897
1898        if let Some(ot) = old_thread.guest() {
1899            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1900        }
1901        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1902
1903        if let Some(mut fiber) = fiber {
1904            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1905            // See the `SuspendReason` documentation for what each case means.
1906            let reason = match state.suspend_reason.take() {
1907                Some(r) => r,
1908                None => bail_bug!("suspend reason missing when resuming fiber"),
1909            };
1910            match reason {
1911                SuspendReason::NeedWork => {
1912                    if state.worker.is_none() {
1913                        state.worker = Some(fiber);
1914                    } else {
1915                        fiber.dispose(self);
1916                    }
1917                }
1918                SuspendReason::Yielding {
1919                    thread,
1920                    cancellable,
1921                    ..
1922                } => {
1923                    state.get_mut(thread.thread)?.state =
1924                        GuestThreadState::Ready { fiber, cancellable };
1925                    let instance = state.get_mut(thread.task)?.instance.index;
1926                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1927                }
1928                SuspendReason::ExplicitlySuspending { thread, .. } => {
1929                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1930                }
1931                SuspendReason::Waiting { set, thread, .. } => {
1932                    let old = state
1933                        .get_mut(set)?
1934                        .waiting
1935                        .insert(thread, WaitMode::Fiber(fiber));
1936                    assert!(old.is_none());
1937                }
1938            };
1939        } else {
1940            log::trace!("resume_fiber: fiber has exited");
1941        }
1942
1943        Ok(())
1944    }
1945
1946    /// Suspend the current fiber, storing the reason in
1947    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1948    /// it should be resumed.
1949    ///
1950    /// See the `SuspendReason` documentation for details.
1951    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1952        log::trace!("suspend fiber: {reason:?}");
1953
1954        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1955        // pop the call context which manages resource borrows before suspending
1956        // and then push it again once we've resumed.
1957        let task = match &reason {
1958            SuspendReason::Yielding { thread, .. }
1959            | SuspendReason::Waiting { thread, .. }
1960            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1961            SuspendReason::NeedWork => None,
1962        };
1963
1964        let old_guest_thread = if task.is_some() {
1965            self.concurrent_state_mut().current_thread
1966        } else {
1967            CurrentThread::None
1968        };
1969
1970        // We should not have reached here unless either there's no current
1971        // task, or the current task is permitted to block.  In addition, we
1972        // special-case `thread.switch-to` and waiting for a subtask to go from
1973        // `starting` to `started`, both of which we consider non-blocking
1974        // operations despite requiring a suspend.
1975        debug_assert!(
1976            matches!(
1977                reason,
1978                SuspendReason::ExplicitlySuspending {
1979                    skip_may_block_check: true,
1980                    ..
1981                } | SuspendReason::Waiting {
1982                    skip_may_block_check: true,
1983                    ..
1984                } | SuspendReason::Yielding {
1985                    skip_may_block_check: true,
1986                    ..
1987                }
1988            ) || old_guest_thread
1989                .guest()
1990                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1991                .transpose()?
1992                .unwrap_or(true)
1993        );
1994
1995        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1996        assert!(suspend_reason.is_none());
1997        *suspend_reason = Some(reason);
1998
1999        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2000
2001        if task.is_some() {
2002            self.set_thread(old_guest_thread)?;
2003        }
2004
2005        Ok(())
2006    }
2007
2008    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2009        let state = self.concurrent_state_mut();
2010
2011        if waitable.common(state)?.set.is_some() {
2012            bail!(Trap::WaitableSyncAndAsync);
2013        }
2014
2015        let caller = state.current_guest_thread()?;
2016        let set = state.get_mut(caller.thread)?.sync_call_set;
2017        waitable.join(state, Some(set))?;
2018        self.suspend(SuspendReason::Waiting {
2019            set,
2020            thread: caller,
2021            skip_may_block_check: false,
2022        })?;
2023        let state = self.concurrent_state_mut();
2024        waitable.join(state, None)
2025    }
2026
2027    /// Cleans up the data structures backing the `guest_thread` specified,
2028    /// removing it from the internal tables of `runtime_instance` as well.
2029    ///
2030    /// This function is used whenever a guest thread has fully exited and
2031    /// completed. This'll clean up the associated `GuestThread` structure and
2032    /// related resources it contains.
2033    ///
2034    /// Other functionality that this implements is:
2035    ///
2036    /// * This will perform conditional cleanup of the `GuestTask` that owns
2037    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2038    /// * If there are no more threads in the `GuestTask` that this thread is
2039    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2040    ///   returned or cancelled), then a trap will be raised that a result
2041    ///   wasn't ever produced.
2042    /// * If this task is finished, meaning the top-level thread exited and
2043    ///   additionally it's been returned or cancelled, then this will handle
2044    ///   management of the store's "active interesting tasks" counter.
2045    ///
2046    /// Effectively this is intended to be a "narrow waist" through which many
2047    /// destruction operations are funneled through.
2048    fn cleanup_thread(
2049        &mut self,
2050        guest_thread: QualifiedThreadId,
2051        runtime_instance: RuntimeInstance,
2052        cleanup_task: CleanupTask,
2053    ) -> Result<()> {
2054        let state = self.concurrent_state_mut();
2055        let thread_data = state.get_mut(guest_thread.thread)?;
2056        let sync_call_set = thread_data.sync_call_set;
2057        if let Some(guest_id) = thread_data.instance_rep {
2058            self.instance_state(runtime_instance)
2059                .thread_handle_table()
2060                .guest_thread_remove(guest_id)?;
2061        }
2062        let state = self.concurrent_state_mut();
2063
2064        // Clean up any pending subtasks in the sync_call_set
2065        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2066            if let Some(Event::Subtask {
2067                status: Status::Returned | Status::ReturnCancelled,
2068            }) = waitable.common(state)?.event
2069            {
2070                waitable.delete_from(state)?;
2071            }
2072        }
2073
2074        state.delete(guest_thread.thread)?;
2075        state.delete(sync_call_set)?;
2076        let task = state.get_mut(guest_thread.task)?;
2077        task.threads.remove(&guest_thread.thread);
2078
2079        if task.threads.is_empty() && !task.returned_or_cancelled() {
2080            bail!(Trap::NoAsyncResult);
2081        }
2082        let ready_to_delete = task.ready_to_delete();
2083
2084        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2085            task.decremented_interesting_task_count = true;
2086
2087            debug_assert!(state.interesting_tasks > 0);
2088            state.interesting_tasks -= 1;
2089            if state.interesting_tasks == 0
2090                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2091            {
2092                waker.wake();
2093            }
2094        }
2095
2096        match cleanup_task {
2097            CleanupTask::Yes => {
2098                if ready_to_delete {
2099                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2100                }
2101            }
2102            CleanupTask::No => {}
2103        }
2104
2105        Ok(())
2106    }
2107
2108    /// Performs cancellation of the `guest_task` specified with the
2109    /// precondition that the task hasn't lowered its parameters.
2110    ///
2111    /// In this situation the task hasn't ever been started meaning it hasn't
2112    /// actually run any wasm code yet. This requires cleaning up metadata such
2113    /// as thread information attached to the task.
2114    ///
2115    /// The main two entrypoints for this function are:
2116    ///
2117    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2118    /// * Dropping a host `call_async` future which needs to cancel the task
2119    ///   because it cannot reference its parameters any more.
2120    fn cancel_guest_subtask_without_lowered_parameters(
2121        &mut self,
2122        caller_instance: RuntimeInstance,
2123        guest_task: TableId<GuestTask>,
2124    ) -> Result<()> {
2125        let concurrent_state = self.concurrent_state_mut();
2126        let task = concurrent_state.get_mut(guest_task)?;
2127        assert!(!task.already_lowered_parameters());
2128        // The task is in a `starting` state, meaning it hasn't run at
2129        // all yet.  Here we update its fields to indicate that it is
2130        // ready to delete immediately once `subtask.drop` is called.
2131        task.lower_params = None;
2132        task.lift_result = None;
2133        task.exited = true;
2134        let instance = task.instance;
2135
2136        // Clean up the thread within this task as it's now never going
2137        // to run.
2138        assert_eq!(1, task.threads.len());
2139        let thread = *task.threads.iter().next().unwrap();
2140        self.cleanup_thread(
2141            QualifiedThreadId {
2142                task: guest_task,
2143                thread,
2144            },
2145            caller_instance,
2146            CleanupTask::No,
2147        )?;
2148
2149        // Not yet started; cancel and remove from pending
2150        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2151        let pending_count = pending.len();
2152        pending.retain(|thread, _| thread.task != guest_task);
2153        // If there were no pending threads for this task, we're in an error state
2154        if pending.len() == pending_count {
2155            bail!(Trap::SubtaskCancelAfterTerminal);
2156        }
2157        Ok(())
2158    }
2159}
2160
2161enum CleanupTask {
2162    Yes,
2163    No,
2164}
2165
2166impl Instance {
2167    /// Get the next pending event for the specified task and (optional)
2168    /// waitable set, along with the waitable handle if applicable.
2169    fn get_event(
2170        self,
2171        store: &mut StoreOpaque,
2172        guest_task: TableId<GuestTask>,
2173        set: Option<TableId<WaitableSet>>,
2174        cancellable: bool,
2175    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2176        let state = store.concurrent_state_mut();
2177
2178        let event = &mut state.get_mut(guest_task)?.event;
2179        if let Some(ev) = event
2180            && (cancellable || !matches!(ev, Event::Cancelled))
2181        {
2182            log::trace!("deliver event {ev:?} to {guest_task:?}");
2183            let ev = *ev;
2184            *event = None;
2185            return Ok(Some((ev, None)));
2186        }
2187
2188        let set = match set {
2189            Some(set) => set,
2190            None => return Ok(None),
2191        };
2192        let waitable = match state.get_mut(set)?.ready.pop_first() {
2193            Some(v) => v,
2194            None => return Ok(None),
2195        };
2196
2197        let common = waitable.common(state)?;
2198        let handle = match common.handle {
2199            Some(h) => h,
2200            None => bail_bug!("handle not set when delivering event"),
2201        };
2202        let event = match common.event.take() {
2203            Some(e) => e,
2204            None => bail_bug!("event not set when delivering event"),
2205        };
2206
2207        log::trace!(
2208            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2209        );
2210
2211        waitable.on_delivery(store, self, event)?;
2212
2213        Ok(Some((event, Some((waitable, handle)))))
2214    }
2215
2216    /// Handle the `CallbackCode` returned from an async-lifted export or its
2217    /// callback.
2218    ///
2219    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2220    /// using `handle_guest_call`.
2221    fn handle_callback_code(
2222        self,
2223        store: &mut StoreOpaque,
2224        guest_thread: QualifiedThreadId,
2225        runtime_instance: RuntimeComponentInstanceIndex,
2226        code: u32,
2227    ) -> Result<Option<GuestCall>> {
2228        let (code, set) = unpack_callback_code(code);
2229
2230        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2231
2232        let state = store.concurrent_state_mut();
2233
2234        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2235            let set = store
2236                .instance_state(self.runtime_instance(runtime_instance))
2237                .handle_table()
2238                .waitable_set_rep(handle)?;
2239
2240            Ok(TableId::<WaitableSet>::new(set))
2241        };
2242
2243        Ok(match code {
2244            callback_code::EXIT => {
2245                log::trace!("implicit thread {guest_thread:?} completed");
2246                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2247                task.exited = true;
2248                task.callback = None;
2249                store.cleanup_thread(
2250                    guest_thread,
2251                    self.runtime_instance(runtime_instance),
2252                    CleanupTask::Yes,
2253                )?;
2254                None
2255            }
2256            callback_code::YIELD => {
2257                let task = state.get_mut(guest_thread.task)?;
2258                // If an `Event::Cancelled` is pending, we'll deliver that;
2259                // otherwise, we'll deliver `Event::None`.  Note that
2260                // `GuestTask::event` is only ever set to one of those two
2261                // `Event` variants.
2262                if let Some(event) = task.event {
2263                    assert!(matches!(event, Event::None | Event::Cancelled));
2264                } else {
2265                    task.event = Some(Event::None);
2266                }
2267                let call = GuestCall {
2268                    thread: guest_thread,
2269                    kind: GuestCallKind::DeliverEvent {
2270                        instance: self,
2271                        set: None,
2272                    },
2273                };
2274                if state.may_block(guest_thread.task)? {
2275                    // Push this thread onto the "low priority" queue so it runs
2276                    // after any other threads have had a chance to run.
2277                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2278                    None
2279                } else {
2280                    // Yielding in a non-blocking context is defined as a no-op
2281                    // according to the spec, so we must run this thread
2282                    // immediately without allowing any others to run.
2283                    Some(call)
2284                }
2285            }
2286            callback_code::WAIT => {
2287                // The task may only return `WAIT` if it was created for a call
2288                // to an async export).  Otherwise, we'll trap.
2289                state.check_blocking_for(guest_thread.task)?;
2290
2291                let set = get_set(store, set)?;
2292                let state = store.concurrent_state_mut();
2293
2294                if state.get_mut(guest_thread.task)?.event.is_some()
2295                    || !state.get_mut(set)?.ready.is_empty()
2296                {
2297                    // An event is immediately available; deliver it ASAP.
2298                    state.push_high_priority(WorkItem::GuestCall(
2299                        runtime_instance,
2300                        GuestCall {
2301                            thread: guest_thread,
2302                            kind: GuestCallKind::DeliverEvent {
2303                                instance: self,
2304                                set: Some(set),
2305                            },
2306                        },
2307                    ));
2308                } else {
2309                    // No event is immediately available.
2310                    //
2311                    // We're waiting, so register to be woken up when an event
2312                    // is published for this waitable set.
2313                    //
2314                    // Here we also set `GuestTask::wake_on_cancel` which allows
2315                    // `subtask.cancel` to interrupt the wait.
2316                    let old = state
2317                        .get_mut(guest_thread.thread)?
2318                        .wake_on_cancel
2319                        .replace(set);
2320                    if !old.is_none() {
2321                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2322                    }
2323                    let old = state
2324                        .get_mut(set)?
2325                        .waiting
2326                        .insert(guest_thread, WaitMode::Callback(self));
2327                    if !old.is_none() {
2328                        bail_bug!("set's waiting set already had this thread registered");
2329                    }
2330                }
2331                None
2332            }
2333            _ => bail!(Trap::UnsupportedCallbackCode),
2334        })
2335    }
2336
2337    /// Add the specified guest call to the "high priority" work item queue, to
2338    /// be started as soon as backpressure and/or reentrance rules allow.
2339    ///
2340    /// SAFETY: The raw pointer arguments must be valid references to guest
2341    /// functions (with the appropriate signatures) when the closures queued by
2342    /// this function are called.
2343    unsafe fn queue_call<T: 'static>(
2344        self,
2345        mut store: StoreContextMut<T>,
2346        guest_thread: QualifiedThreadId,
2347        callee: SendSyncPtr<VMFuncRef>,
2348        param_count: usize,
2349        result_count: usize,
2350        async_: bool,
2351        callback: Option<SendSyncPtr<VMFuncRef>>,
2352        post_return: Option<SendSyncPtr<VMFuncRef>>,
2353    ) -> Result<()> {
2354        /// Return a closure which will call the specified function in the scope
2355        /// of the specified task.
2356        ///
2357        /// This will use `GuestTask::lower_params` to lower the parameters, but
2358        /// will not lift the result; instead, it returns a
2359        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2360        /// any, may be lifted.  Note that an async-lifted export will have
2361        /// returned its result using the `task.return` intrinsic (or not
2362        /// returned a result at all, in the case of `task.cancel`), in which
2363        /// case the "result" of this call will either be a callback code or
2364        /// nothing.
2365        ///
2366        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2367        /// the returned closure is called.
2368        unsafe fn make_call<T: 'static>(
2369            store: StoreContextMut<T>,
2370            guest_thread: QualifiedThreadId,
2371            callee: SendSyncPtr<VMFuncRef>,
2372            param_count: usize,
2373            result_count: usize,
2374        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2375        + Send
2376        + Sync
2377        + 'static
2378        + use<T> {
2379            let token = StoreToken::new(store);
2380            move |store: &mut dyn VMStore| {
2381                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2382
2383                store
2384                    .concurrent_state_mut()
2385                    .get_mut(guest_thread.thread)?
2386                    .state = GuestThreadState::Running;
2387                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2388                let lower = match task.lower_params.take() {
2389                    Some(l) => l,
2390                    None => bail_bug!("lower_params missing"),
2391                };
2392
2393                lower(store, &mut storage[..param_count])?;
2394
2395                let mut store = token.as_context_mut(store);
2396
2397                // SAFETY: Per the contract documented in `make_call's`
2398                // documentation, `callee` must be a valid pointer.
2399                unsafe {
2400                    crate::Func::call_unchecked_raw(
2401                        &mut store,
2402                        callee.as_non_null(),
2403                        NonNull::new(
2404                            &mut storage[..param_count.max(result_count)]
2405                                as *mut [MaybeUninit<ValRaw>] as _,
2406                        )
2407                        .unwrap(),
2408                    )?;
2409                }
2410
2411                Ok(storage)
2412            }
2413        }
2414
2415        // SAFETY: Per the contract described in this function documentation,
2416        // the `callee` pointer which `call` closes over must be valid when
2417        // called by the closure we queue below.
2418        let call = unsafe {
2419            make_call(
2420                store.as_context_mut(),
2421                guest_thread,
2422                callee,
2423                param_count,
2424                result_count,
2425            )
2426        };
2427
2428        let callee_instance = store
2429            .0
2430            .concurrent_state_mut()
2431            .get_mut(guest_thread.task)?
2432            .instance;
2433
2434        let fun = if callback.is_some() {
2435            assert!(async_);
2436
2437            Box::new(move |store: &mut dyn VMStore| {
2438                self.add_guest_thread_to_instance_table(
2439                    guest_thread.thread,
2440                    store,
2441                    callee_instance.index,
2442                )?;
2443                let old_thread = store.set_thread(guest_thread)?;
2444                log::trace!(
2445                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2446                );
2447
2448                store.enter_instance(callee_instance);
2449
2450                // SAFETY: See the documentation for `make_call` to review the
2451                // contract we must uphold for `call` here.
2452                //
2453                // Per the contract described in the `queue_call`
2454                // documentation, the `callee` pointer which `call` closes
2455                // over must be valid.
2456                let storage = call(store)?;
2457
2458                store.exit_instance(callee_instance)?;
2459
2460                store.set_thread(old_thread)?;
2461                let state = store.concurrent_state_mut();
2462                if let Some(t) = old_thread.guest() {
2463                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2464                }
2465                log::trace!("stackless call: restored {old_thread:?} as current thread");
2466
2467                // SAFETY: `wasmparser` will have validated that the callback
2468                // function returns a `i32` result.
2469                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2470
2471                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2472            })
2473                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2474        } else {
2475            let token = StoreToken::new(store.as_context_mut());
2476            Box::new(move |store: &mut dyn VMStore| {
2477                self.add_guest_thread_to_instance_table(
2478                    guest_thread.thread,
2479                    store,
2480                    callee_instance.index,
2481                )?;
2482                let old_thread = store.set_thread(guest_thread)?;
2483                log::trace!(
2484                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2485                );
2486                let flags = self.id().get(store).instance_flags(callee_instance.index);
2487
2488                // Unless this is a callback-less (i.e. stackful)
2489                // async-lifted export, we need to record that the instance
2490                // cannot be entered until the call returns.
2491                if !async_ {
2492                    store.enter_instance(callee_instance);
2493                }
2494
2495                // SAFETY: See the documentation for `make_call` to review the
2496                // contract we must uphold for `call` here.
2497                //
2498                // Per the contract described in the `queue_call`
2499                // documentation, the `callee` pointer which `call` closes
2500                // over must be valid.
2501                let storage = call(store)?;
2502
2503                if !async_ {
2504                    // This is a sync-lifted export, so now is when we lift the
2505                    // result, optionally call the post-return function, if any,
2506                    // and finally notify any current or future waiters that the
2507                    // subtask has returned.
2508
2509                    let lift = {
2510                        store.exit_instance(callee_instance)?;
2511
2512                        let state = store.concurrent_state_mut();
2513                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2514                            bail_bug!("task has already produced a result");
2515                        }
2516
2517                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2518                            Some(lift) => lift,
2519                            None => bail_bug!("lift_result field is missing"),
2520                        }
2521                    };
2522
2523                    // SAFETY: `result_count` represents the number of core Wasm
2524                    // results returned, per `wasmparser`.
2525                    let result = (lift.lift)(store, unsafe {
2526                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2527                            &storage[..result_count],
2528                        )
2529                    })?;
2530
2531                    let post_return_arg = match result_count {
2532                        0 => ValRaw::i32(0),
2533                        // SAFETY: `result_count` represents the number of
2534                        // core Wasm results returned, per `wasmparser`.
2535                        1 => unsafe { storage[0].assume_init() },
2536                        _ => unreachable!(),
2537                    };
2538
2539                    unsafe {
2540                        call_post_return(
2541                            token.as_context_mut(store),
2542                            post_return.map(|v| v.as_non_null()),
2543                            post_return_arg,
2544                            flags,
2545                        )?;
2546                    }
2547
2548                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2549                }
2550
2551                store.set_thread(old_thread)?;
2552
2553                store
2554                    .concurrent_state_mut()
2555                    .get_mut(guest_thread.task)?
2556                    .exited = true;
2557
2558                // This is a callback-less call, so the implicit thread has now completed
2559                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2560                Ok(None)
2561            })
2562        };
2563
2564        store
2565            .0
2566            .concurrent_state_mut()
2567            .push_high_priority(WorkItem::GuestCall(
2568                callee_instance.index,
2569                GuestCall {
2570                    thread: guest_thread,
2571                    kind: GuestCallKind::StartImplicit(fun),
2572                },
2573            ));
2574
2575        Ok(())
2576    }
2577
2578    /// Prepare (but do not start) a guest->guest call.
2579    ///
2580    /// This is called from fused adapter code generated in
2581    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2582    /// are synthesized Wasm functions which move the parameters from the caller
2583    /// to the callee and the result from the callee to the caller,
2584    /// respectively.  The adapter will call `Self::start_call` immediately
2585    /// after calling this function.
2586    ///
2587    /// SAFETY: All the pointer arguments must be valid pointers to guest
2588    /// entities (and with the expected signatures for the function references
2589    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2590    unsafe fn prepare_call<T: 'static>(
2591        self,
2592        mut store: StoreContextMut<T>,
2593        start: NonNull<VMFuncRef>,
2594        return_: NonNull<VMFuncRef>,
2595        caller_instance: RuntimeComponentInstanceIndex,
2596        callee_instance: RuntimeComponentInstanceIndex,
2597        task_return_type: TypeTupleIndex,
2598        callee_async: bool,
2599        memory: *mut VMMemoryDefinition,
2600        string_encoding: StringEncoding,
2601        caller_info: CallerInfo,
2602    ) -> Result<()> {
2603        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2604            // A task may only call an async-typed function via a sync lower if
2605            // it was created by a call to an async export.  Otherwise, we'll
2606            // trap.
2607            store.0.check_blocking()?;
2608        }
2609
2610        enum ResultInfo {
2611            Heap { results: u32 },
2612            Stack { result_count: u32 },
2613        }
2614
2615        let result_info = match &caller_info {
2616            CallerInfo::Async {
2617                has_result: true,
2618                params,
2619            } => ResultInfo::Heap {
2620                results: match params.last() {
2621                    Some(r) => r.get_u32(),
2622                    None => bail_bug!("retptr missing"),
2623                },
2624            },
2625            CallerInfo::Async {
2626                has_result: false, ..
2627            } => ResultInfo::Stack { result_count: 0 },
2628            CallerInfo::Sync {
2629                result_count,
2630                params,
2631            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2632                results: match params.last() {
2633                    Some(r) => r.get_u32(),
2634                    None => bail_bug!("arg ptr missing"),
2635                },
2636            },
2637            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2638                result_count: *result_count,
2639            },
2640        };
2641
2642        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2643
2644        // Create a new guest task for the call, closing over the `start` and
2645        // `return_` functions to lift the parameters and lower the result,
2646        // respectively.
2647        let start = SendSyncPtr::new(start);
2648        let return_ = SendSyncPtr::new(return_);
2649        let token = StoreToken::new(store.as_context_mut());
2650        let state = store.0.concurrent_state_mut();
2651        let old_thread = state.current_guest_thread()?;
2652
2653        debug_assert_eq!(
2654            state.get_mut(old_thread.task)?.instance,
2655            self.runtime_instance(caller_instance)
2656        );
2657
2658        let guest_thread = GuestTask::new(
2659            state,
2660            Box::new(move |store, dst| {
2661                let mut store = token.as_context_mut(store);
2662                assert!(dst.len() <= MAX_FLAT_PARAMS);
2663                // The `+ 1` here accounts for the return pointer, if any:
2664                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2665                let count = match caller_info {
2666                    // Async callers, if they have a result, use the last
2667                    // parameter as a return pointer so chop that off if
2668                    // relevant here.
2669                    CallerInfo::Async { params, has_result } => {
2670                        let params = &params[..params.len() - usize::from(has_result)];
2671                        for (param, src) in params.iter().zip(&mut src) {
2672                            src.write(*param);
2673                        }
2674                        params.len()
2675                    }
2676
2677                    // Sync callers forward everything directly.
2678                    CallerInfo::Sync { params, .. } => {
2679                        for (param, src) in params.iter().zip(&mut src) {
2680                            src.write(*param);
2681                        }
2682                        params.len()
2683                    }
2684                };
2685                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2686                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2687                // how it was constructed (see
2688                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2689                // for details) we know it takes count parameters and returns
2690                // `dst.len()` results.
2691                unsafe {
2692                    crate::Func::call_unchecked_raw(
2693                        &mut store,
2694                        start.as_non_null(),
2695                        NonNull::new(
2696                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2697                        )
2698                        .unwrap(),
2699                    )?;
2700                }
2701                dst.copy_from_slice(&src[..dst.len()]);
2702                let state = store.0.concurrent_state_mut();
2703                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2704                    state,
2705                    Some(Event::Subtask {
2706                        status: Status::Started,
2707                    }),
2708                )?;
2709                Ok(())
2710            }),
2711            LiftResult {
2712                lift: Box::new(move |store, src| {
2713                    // SAFETY: See comment in closure passed as `lower_params`
2714                    // parameter above.
2715                    let mut store = token.as_context_mut(store);
2716                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2717                    if let ResultInfo::Heap { results } = &result_info {
2718                        my_src.push(ValRaw::u32(*results));
2719                    }
2720
2721                    // Execute the `return_` hook, generated by Wasmtime's FACT
2722                    // compiler, in the context of the old thread. The old
2723                    // thread, this thread's caller, may have `realloc`
2724                    // callbacks invoked for example and those need the correct
2725                    // context set for the current thread.
2726                    let prev = store.0.set_thread(old_thread)?;
2727
2728                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2729                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2730                    // on how it was constructed (see
2731                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2732                    // for details) we know it takes `src.len()` parameters and
2733                    // returns up to 1 result.
2734                    unsafe {
2735                        crate::Func::call_unchecked_raw(
2736                            &mut store,
2737                            return_.as_non_null(),
2738                            my_src.as_mut_slice().into(),
2739                        )?;
2740                    }
2741
2742                    // Restore the previous current thread after the
2743                    // lifting/lowering has returned.
2744                    store.0.set_thread(prev)?;
2745
2746                    let state = store.0.concurrent_state_mut();
2747                    let thread = state.current_guest_thread()?;
2748                    if sync_caller {
2749                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2750                            if let ResultInfo::Stack { result_count } = &result_info {
2751                                match result_count {
2752                                    0 => None,
2753                                    1 => Some(my_src[0]),
2754                                    _ => unreachable!(),
2755                                }
2756                            } else {
2757                                None
2758                            },
2759                        );
2760                    }
2761                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2762                }),
2763                ty: task_return_type,
2764                memory: NonNull::new(memory).map(SendSyncPtr::new),
2765                string_encoding,
2766            },
2767            Caller::Guest { thread: old_thread },
2768            None,
2769            self.runtime_instance(callee_instance),
2770            callee_async,
2771        )?;
2772
2773        // Make the new thread the current one so that `Self::start_call` knows
2774        // which one to start.
2775        store.0.set_thread(guest_thread)?;
2776        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2777
2778        Ok(())
2779    }
2780
2781    /// Call the specified callback function for an async-lifted export.
2782    ///
2783    /// SAFETY: `function` must be a valid reference to a guest function of the
2784    /// correct signature for a callback.
2785    unsafe fn call_callback<T>(
2786        self,
2787        mut store: StoreContextMut<T>,
2788        function: SendSyncPtr<VMFuncRef>,
2789        event: Event,
2790        handle: u32,
2791    ) -> Result<u32> {
2792        let (ordinal, result) = event.parts();
2793        let params = &mut [
2794            ValRaw::u32(ordinal),
2795            ValRaw::u32(handle),
2796            ValRaw::u32(result),
2797        ];
2798        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2799        // `wasmtime-cranelift`-generated fused adapter code or
2800        // `component::Options`.  Per `wasmparser` callback signature
2801        // validation, we know it takes three parameters and returns one.
2802        unsafe {
2803            crate::Func::call_unchecked_raw(
2804                &mut store,
2805                function.as_non_null(),
2806                params.as_mut_slice().into(),
2807            )?;
2808        }
2809        Ok(params[0].get_u32())
2810    }
2811
2812    /// Start a guest->guest call previously prepared using
2813    /// `Self::prepare_call`.
2814    ///
2815    /// This is called from fused adapter code generated in
2816    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2817    /// this function immediately after calling `Self::prepare_call`.
2818    ///
2819    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2820    /// functions with the appropriate signatures for the current guest task.
2821    /// If this is a call to an async-lowered import, the actual call may be
2822    /// deferred and run after this function returns, in which case the pointer
2823    /// arguments must also be valid when the call happens.
2824    unsafe fn start_call<T: 'static>(
2825        self,
2826        mut store: StoreContextMut<T>,
2827        callback: *mut VMFuncRef,
2828        post_return: *mut VMFuncRef,
2829        callee: NonNull<VMFuncRef>,
2830        param_count: u32,
2831        result_count: u32,
2832        flags: u32,
2833        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2834    ) -> Result<u32> {
2835        let token = StoreToken::new(store.as_context_mut());
2836        let async_caller = storage.is_none();
2837        let state = store.0.concurrent_state_mut();
2838        let guest_thread = state.current_guest_thread()?;
2839        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2840        let callee = SendSyncPtr::new(callee);
2841        let param_count = usize::try_from(param_count)?;
2842        assert!(param_count <= MAX_FLAT_PARAMS);
2843        let result_count = usize::try_from(result_count)?;
2844        assert!(result_count <= MAX_FLAT_RESULTS);
2845
2846        let task = state.get_mut(guest_thread.task)?;
2847        if let Some(callback) = NonNull::new(callback) {
2848            // We're calling an async-lifted export with a callback, so store
2849            // the callback and related context as part of the task so we can
2850            // call it later when needed.
2851            let callback = SendSyncPtr::new(callback);
2852            task.callback = Some(Box::new(move |store, event, handle| {
2853                let store = token.as_context_mut(store);
2854                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2855            }));
2856        }
2857
2858        let Caller::Guest { thread: caller } = &task.caller else {
2859            // As of this writing, `start_call` is only used for guest->guest
2860            // calls.
2861            bail_bug!("start_call unexpectedly invoked for host->guest call");
2862        };
2863        let caller = *caller;
2864        let caller_instance = state.get_mut(caller.task)?.instance;
2865
2866        // Queue the call as a "high priority" work item.
2867        unsafe {
2868            self.queue_call(
2869                store.as_context_mut(),
2870                guest_thread,
2871                callee,
2872                param_count,
2873                result_count,
2874                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2875                NonNull::new(callback).map(SendSyncPtr::new),
2876                NonNull::new(post_return).map(SendSyncPtr::new),
2877            )?;
2878        }
2879
2880        let state = store.0.concurrent_state_mut();
2881
2882        // Use the caller's `GuestThread::sync_call_set` to register interest in
2883        // the subtask...
2884        let guest_waitable = Waitable::Guest(guest_thread.task);
2885        let old_set = guest_waitable.common(state)?.set;
2886        let set = state.get_mut(caller.thread)?.sync_call_set;
2887        guest_waitable.join(state, Some(set))?;
2888
2889        store.0.set_thread(CurrentThread::None)?;
2890
2891        // ... and suspend this fiber temporarily while we wait for it to start.
2892        //
2893        // Note that we _could_ call the callee directly using the current fiber
2894        // rather than suspend this one, but that would make reasoning about the
2895        // event loop more complicated and is probably only worth doing if
2896        // there's a measurable performance benefit.  In addition, it would mean
2897        // blocking the caller if the callee calls a blocking sync-lowered
2898        // import, and as of this writing the spec says we must not do that.
2899        //
2900        // Alternatively, the fused adapter code could be modified to call the
2901        // callee directly without calling a host-provided intrinsic at all (in
2902        // which case it would need to do its own, inline backpressure checks,
2903        // etc.).  Again, we'd want to see a measurable performance benefit
2904        // before committing to such an optimization.  And again, we'd need to
2905        // update the spec to allow that.
2906        let (status, waitable) = loop {
2907            store.0.suspend(SuspendReason::Waiting {
2908                set,
2909                thread: caller,
2910                // Normally, `StoreOpaque::suspend` would assert it's being
2911                // called from a context where blocking is allowed.  However, if
2912                // `async_caller` is `true`, we'll only "block" long enough for
2913                // the callee to start, i.e. we won't repeat this loop, so we
2914                // tell `suspend` it's okay even if we're not allowed to block.
2915                // Alternatively, if the callee is not an async function, then
2916                // we know it won't block anyway.
2917                skip_may_block_check: async_caller || !callee_async,
2918            })?;
2919
2920            let state = store.0.concurrent_state_mut();
2921
2922            log::trace!("taking event for {:?}", guest_thread.task);
2923            let event = guest_waitable.take_event(state)?;
2924            let Some(Event::Subtask { status }) = event else {
2925                bail_bug!("subtasks should only get subtask events, got {event:?}")
2926            };
2927
2928            log::trace!("status {status:?} for {:?}", guest_thread.task);
2929
2930            if status == Status::Returned {
2931                // It returned, so we can stop waiting.
2932                break (status, None);
2933            } else if async_caller {
2934                // It hasn't returned yet, but the caller is calling via an
2935                // async-lowered import, so we generate a handle for the task
2936                // waitable and return the status.
2937                let handle = store
2938                    .0
2939                    .instance_state(caller_instance)
2940                    .handle_table()
2941                    .subtask_insert_guest(guest_thread.task.rep())?;
2942                store
2943                    .0
2944                    .concurrent_state_mut()
2945                    .get_mut(guest_thread.task)?
2946                    .common
2947                    .handle = Some(handle);
2948                break (status, Some(handle));
2949            } else {
2950                // The callee hasn't returned yet, and the caller is calling via
2951                // a sync-lowered import, so we loop and keep waiting until the
2952                // callee returns.
2953            }
2954        };
2955
2956        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2957
2958        // Reset the current thread to point to the caller as it resumes control.
2959        store.0.set_thread(caller)?;
2960        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2961        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2962
2963        if let Some(storage) = storage {
2964            // The caller used a sync-lowered import to call an async-lifted
2965            // export, in which case the result, if any, has been stashed in
2966            // `GuestTask::sync_result`.
2967            let state = store.0.concurrent_state_mut();
2968            let task = state.get_mut(guest_thread.task)?;
2969            if let Some(result) = task.sync_result.take()? {
2970                if let Some(result) = result {
2971                    storage[0] = MaybeUninit::new(result);
2972                }
2973
2974                if task.exited && task.ready_to_delete() {
2975                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2976                }
2977            }
2978        }
2979
2980        Ok(status.pack(waitable))
2981    }
2982
2983    /// Poll the specified future once on behalf of a guest->host call using an
2984    /// async-lowered import.
2985    ///
2986    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2987    /// `Pending`, add it to the set of futures to be polled as part of this
2988    /// instance's event loop until it completes, and then return
2989    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2990    ///
2991    /// Whether the future returns `Ready` immediately or later, the `lower`
2992    /// function will be used to lower the result, if any, into the guest caller's
2993    /// stack and linear memory. The `lower` function is invoked with `None` if
2994    /// the future is cancelled.
2995    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2996        self,
2997        mut store: StoreContextMut<'_, T>,
2998        future: impl Future<Output = Result<R>> + Send + 'static,
2999        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3000    ) -> Result<Option<u32>> {
3001        let token = StoreToken::new(store.as_context_mut());
3002        let state = store.0.concurrent_state_mut();
3003        let task = state.current_host_thread()?;
3004
3005        // Create an abortable future which hooks calls to poll and manages call
3006        // context state for the future.
3007        let (join_handle, future) = JoinHandle::run(future);
3008        {
3009            let state = &mut state.get_mut(task)?.state;
3010            assert!(matches!(state, HostTaskState::CalleeStarted));
3011            *state = HostTaskState::CalleeRunning(join_handle);
3012        }
3013
3014        let mut future = Box::pin(future);
3015
3016        // Finally, poll the future.  We can use a dummy `Waker` here because
3017        // we'll add the future to `ConcurrentState::futures` and poll it
3018        // automatically from the event loop if it doesn't complete immediately
3019        // here.
3020        let poll = tls::set(store.0, || {
3021            future
3022                .as_mut()
3023                .poll(&mut Context::from_waker(&Waker::noop()))
3024        });
3025
3026        match poll {
3027            // It finished immediately; lower the result and delete the task.
3028            Poll::Ready(result) => {
3029                let result = result.transpose()?;
3030                lower(store.as_context_mut(), result)?;
3031                return Ok(None);
3032            }
3033
3034            // Future isn't ready yet, so fall through.
3035            Poll::Pending => {}
3036        }
3037
3038        // It hasn't finished yet; add the future to
3039        // `ConcurrentState::futures` so it will be polled by the event
3040        // loop and allocate a waitable handle to return to the guest.
3041
3042        // Wrap the future in a closure responsible for lowering the result into
3043        // the guest's stack and memory, as well as notifying any waiters that
3044        // the task returned.
3045        let future = Box::pin(async move {
3046            let result = match future.await {
3047                Some(result) => Some(result?),
3048                None => None,
3049            };
3050            let on_complete = move |store: &mut dyn VMStore| {
3051                // Restore the `current_thread` to be the host so `lower` knows
3052                // how to manipulate borrows and knows which scope of borrows
3053                // to check.
3054                let mut store = token.as_context_mut(store);
3055                let old = store.0.set_thread(task)?;
3056
3057                let status = if result.is_some() {
3058                    Status::Returned
3059                } else {
3060                    Status::ReturnCancelled
3061                };
3062
3063                lower(store.as_context_mut(), result)?;
3064                let state = store.0.concurrent_state_mut();
3065                match &mut state.get_mut(task)?.state {
3066                    // The task is already flagged as finished because it was
3067                    // cancelled. No need to transition further.
3068                    HostTaskState::CalleeDone { .. } => {}
3069
3070                    // Otherwise transition this task to the done state.
3071                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3072                }
3073                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3074
3075                store.0.set_thread(old)?;
3076                Ok(())
3077            };
3078
3079            // Here we schedule a task to run on a worker fiber to do the
3080            // lowering since it may involve a call to the guest's realloc
3081            // function. This is necessary because calling the guest while
3082            // there are host embedder frames on the stack is unsound.
3083            tls::get(move |store| {
3084                store
3085                    .concurrent_state_mut()
3086                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3087                        on_complete,
3088                    ))));
3089                Ok(())
3090            })
3091        });
3092
3093        // Make this task visible to the guest and then record what it
3094        // was made visible as.
3095        let state = store.0.concurrent_state_mut();
3096        state.push_future(future);
3097        let caller = state.get_mut(task)?.caller;
3098        let instance = state.get_mut(caller.task)?.instance;
3099        let handle = store
3100            .0
3101            .instance_state(instance)
3102            .handle_table()
3103            .subtask_insert_host(task.rep())?;
3104        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3105        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3106
3107        // Restore the currently running thread to this host task's
3108        // caller. Note that the host task isn't deallocated as it's
3109        // within the store and will get deallocated later.
3110        store.0.set_thread(caller)?;
3111        Ok(Some(handle))
3112    }
3113
3114    /// Implements the `task.return` intrinsic, lifting the result for the
3115    /// current guest task.
3116    pub(crate) fn task_return(
3117        self,
3118        store: &mut dyn VMStore,
3119        ty: TypeTupleIndex,
3120        options: OptionsIndex,
3121        storage: &[ValRaw],
3122    ) -> Result<()> {
3123        let state = store.concurrent_state_mut();
3124        let guest_thread = state.current_guest_thread()?;
3125        let lift = state
3126            .get_mut(guest_thread.task)?
3127            .lift_result
3128            .take()
3129            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3130        if !state.get_mut(guest_thread.task)?.result.is_none() {
3131            bail_bug!("task result unexpectedly already set");
3132        }
3133
3134        let CanonicalOptions {
3135            string_encoding,
3136            data_model,
3137            ..
3138        } = &self.id().get(store).component().env_component().options[options];
3139
3140        let invalid = ty != lift.ty
3141            || string_encoding != &lift.string_encoding
3142            || match data_model {
3143                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3144                    Some(memory) => {
3145                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3146                        let actual = self.id().get(store).runtime_memory(memory);
3147                        expected != actual.as_ptr()
3148                    }
3149                    // Memory not specified, meaning it didn't need to be
3150                    // specified per validation, so not invalid.
3151                    None => false,
3152                },
3153                // Always invalid as this isn't supported.
3154                CanonicalOptionsDataModel::Gc { .. } => true,
3155            };
3156
3157        if invalid {
3158            bail!(Trap::TaskReturnInvalid);
3159        }
3160
3161        log::trace!("task.return for {guest_thread:?}");
3162
3163        let result = (lift.lift)(store, storage)?;
3164        self.task_complete(store, guest_thread.task, result, Status::Returned)
3165    }
3166
3167    /// Implements the `task.cancel` intrinsic.
3168    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3169        let state = store.concurrent_state_mut();
3170        let guest_thread = state.current_guest_thread()?;
3171        let task = state.get_mut(guest_thread.task)?;
3172        if !task.cancel_sent {
3173            bail!(Trap::TaskCancelNotCancelled);
3174        }
3175        _ = task
3176            .lift_result
3177            .take()
3178            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3179
3180        if !task.result.is_none() {
3181            bail_bug!("task result should not bet set yet");
3182        }
3183
3184        log::trace!("task.cancel for {guest_thread:?}");
3185
3186        self.task_complete(
3187            store,
3188            guest_thread.task,
3189            Box::new(DummyResult),
3190            Status::ReturnCancelled,
3191        )
3192    }
3193
3194    /// Complete the specified guest task (i.e. indicate that it has either
3195    /// returned a (possibly empty) result or cancelled itself).
3196    ///
3197    /// This will return any resource borrows and notify any current or future
3198    /// waiters that the task has completed.
3199    fn task_complete(
3200        self,
3201        store: &mut StoreOpaque,
3202        guest_task: TableId<GuestTask>,
3203        result: Box<dyn Any + Send + Sync>,
3204        status: Status,
3205    ) -> Result<()> {
3206        store
3207            .component_resource_tables(Some(self))
3208            .validate_scope_exit()?;
3209
3210        let state = store.concurrent_state_mut();
3211        let task = state.get_mut(guest_task)?;
3212
3213        if let Caller::Host { tx, .. } = &mut task.caller {
3214            if let Some(tx) = tx.take() {
3215                _ = tx.send(result);
3216            }
3217        } else {
3218            task.result = Some(result);
3219            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3220        }
3221
3222        Ok(())
3223    }
3224
3225    /// Implements the `waitable-set.new` intrinsic.
3226    pub(crate) fn waitable_set_new(
3227        self,
3228        store: &mut StoreOpaque,
3229        caller_instance: RuntimeComponentInstanceIndex,
3230    ) -> Result<u32> {
3231        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3232        let handle = store
3233            .instance_state(self.runtime_instance(caller_instance))
3234            .handle_table()
3235            .waitable_set_insert(set.rep())?;
3236        log::trace!("new waitable set {set:?} (handle {handle})");
3237        Ok(handle)
3238    }
3239
3240    /// Implements the `waitable-set.drop` intrinsic.
3241    pub(crate) fn waitable_set_drop(
3242        self,
3243        store: &mut StoreOpaque,
3244        caller_instance: RuntimeComponentInstanceIndex,
3245        set: u32,
3246    ) -> Result<()> {
3247        let rep = store
3248            .instance_state(self.runtime_instance(caller_instance))
3249            .handle_table()
3250            .waitable_set_remove(set)?;
3251
3252        log::trace!("drop waitable set {rep} (handle {set})");
3253
3254        // Note that we're careful to check for waiters _before_ deleting the
3255        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3256        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3257        if !store
3258            .concurrent_state_mut()
3259            .get_mut(TableId::<WaitableSet>::new(rep))?
3260            .waiting
3261            .is_empty()
3262        {
3263            bail!(Trap::WaitableSetDropHasWaiters);
3264        }
3265
3266        store
3267            .concurrent_state_mut()
3268            .delete(TableId::<WaitableSet>::new(rep))?;
3269
3270        Ok(())
3271    }
3272
3273    /// Implements the `waitable.join` intrinsic.
3274    pub(crate) fn waitable_join(
3275        self,
3276        store: &mut StoreOpaque,
3277        caller_instance: RuntimeComponentInstanceIndex,
3278        waitable_handle: u32,
3279        set_handle: u32,
3280    ) -> Result<()> {
3281        let mut instance = self.id().get_mut(store);
3282        let waitable =
3283            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3284
3285        let set = if set_handle == 0 {
3286            None
3287        } else {
3288            let set = instance.instance_states().0[caller_instance]
3289                .handle_table()
3290                .waitable_set_rep(set_handle)?;
3291
3292            let state = store.concurrent_state_mut();
3293            if let Some(old) = waitable.common(state)?.set
3294                && state.get_mut(old)?.is_sync_call_set
3295            {
3296                bail!(Trap::WaitableSyncAndAsync);
3297            }
3298
3299            Some(TableId::<WaitableSet>::new(set))
3300        };
3301
3302        log::trace!(
3303            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3304        );
3305
3306        waitable.join(store.concurrent_state_mut(), set)
3307    }
3308
3309    /// Implements the `subtask.drop` intrinsic.
3310    pub(crate) fn subtask_drop(
3311        self,
3312        store: &mut StoreOpaque,
3313        caller_instance: RuntimeComponentInstanceIndex,
3314        task_id: u32,
3315    ) -> Result<()> {
3316        self.waitable_join(store, caller_instance, task_id, 0)?;
3317
3318        let (rep, is_host) = store
3319            .instance_state(self.runtime_instance(caller_instance))
3320            .handle_table()
3321            .subtask_remove(task_id)?;
3322
3323        let concurrent_state = store.concurrent_state_mut();
3324        let (waitable, delete) = if is_host {
3325            let id = TableId::<HostTask>::new(rep);
3326            let task = concurrent_state.get_mut(id)?;
3327            match &task.state {
3328                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3329                HostTaskState::CalleeDone { .. } => {}
3330                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3331                    bail_bug!("invalid state for callee in `subtask.drop`")
3332                }
3333            }
3334            (Waitable::Host(id), true)
3335        } else {
3336            let id = TableId::<GuestTask>::new(rep);
3337            let task = concurrent_state.get_mut(id)?;
3338            if task.lift_result.is_some() {
3339                bail!(Trap::SubtaskDropNotResolved);
3340            }
3341            (
3342                Waitable::Guest(id),
3343                concurrent_state.get_mut(id)?.ready_to_delete(),
3344            )
3345        };
3346
3347        waitable.common(concurrent_state)?.handle = None;
3348
3349        // If this subtask has an event that means that the terminal status of
3350        // this subtask wasn't yet received so it can't be dropped yet.
3351        if waitable.take_event(concurrent_state)?.is_some() {
3352            bail!(Trap::SubtaskDropNotResolved);
3353        }
3354
3355        if delete {
3356            waitable.delete_from(concurrent_state)?;
3357        }
3358
3359        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3360        Ok(())
3361    }
3362
3363    /// Implements the `waitable-set.wait` intrinsic.
3364    pub(crate) fn waitable_set_wait(
3365        self,
3366        store: &mut StoreOpaque,
3367        options: OptionsIndex,
3368        set: u32,
3369        payload: u32,
3370    ) -> Result<u32> {
3371        if !self.options(store, options).async_ {
3372            // The caller may only call `waitable-set.wait` from an async task
3373            // (i.e. a task created via a call to an async export).
3374            // Otherwise, we'll trap.
3375            store.check_blocking()?;
3376        }
3377
3378        let &CanonicalOptions {
3379            cancellable,
3380            instance: caller_instance,
3381            ..
3382        } = &self.id().get(store).component().env_component().options[options];
3383        let rep = store
3384            .instance_state(self.runtime_instance(caller_instance))
3385            .handle_table()
3386            .waitable_set_rep(set)?;
3387
3388        self.waitable_check(
3389            store,
3390            cancellable,
3391            WaitableCheck::Wait,
3392            WaitableCheckParams {
3393                set: TableId::new(rep),
3394                options,
3395                payload,
3396            },
3397        )
3398    }
3399
3400    /// Implements the `waitable-set.poll` intrinsic.
3401    pub(crate) fn waitable_set_poll(
3402        self,
3403        store: &mut StoreOpaque,
3404        options: OptionsIndex,
3405        set: u32,
3406        payload: u32,
3407    ) -> Result<u32> {
3408        let &CanonicalOptions {
3409            cancellable,
3410            instance: caller_instance,
3411            ..
3412        } = &self.id().get(store).component().env_component().options[options];
3413        let rep = store
3414            .instance_state(self.runtime_instance(caller_instance))
3415            .handle_table()
3416            .waitable_set_rep(set)?;
3417
3418        self.waitable_check(
3419            store,
3420            cancellable,
3421            WaitableCheck::Poll,
3422            WaitableCheckParams {
3423                set: TableId::new(rep),
3424                options,
3425                payload,
3426            },
3427        )
3428    }
3429
3430    /// Implements the `thread.index` intrinsic.
3431    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3432        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3433        match store
3434            .concurrent_state_mut()
3435            .get_mut(thread_id)?
3436            .instance_rep
3437        {
3438            Some(r) => Ok(r),
3439            None => bail_bug!("thread should have instance_rep by now"),
3440        }
3441    }
3442
3443    /// Implements the `thread.new-indirect` intrinsic.
3444    pub(crate) fn thread_new_indirect<T: 'static>(
3445        self,
3446        mut store: StoreContextMut<T>,
3447        runtime_instance: RuntimeComponentInstanceIndex,
3448        _func_ty_idx: TypeFuncIndex, // currently unused
3449        start_func_table_idx: RuntimeTableIndex,
3450        start_func_idx: u32,
3451        context: i32,
3452    ) -> Result<u32> {
3453        log::trace!("creating new thread");
3454
3455        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3456        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3457        let callee = instance
3458            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3459            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3460        if callee.type_index(store.0) != start_func_ty.type_index() {
3461            bail!(Trap::ThreadNewIndirectInvalidType);
3462        }
3463
3464        let token = StoreToken::new(store.as_context_mut());
3465        let start_func = Box::new(
3466            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3467                let old_thread = store.set_thread(guest_thread)?;
3468                log::trace!(
3469                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3470                );
3471
3472                let mut store = token.as_context_mut(store);
3473                let mut params = [ValRaw::i32(context)];
3474                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3475                // on a separate fiber if we're running in an async store.
3476                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3477
3478                store.0.set_thread(old_thread)?;
3479
3480                store.0.cleanup_thread(
3481                    guest_thread,
3482                    self.runtime_instance(runtime_instance),
3483                    CleanupTask::Yes,
3484                )?;
3485                log::trace!("explicit thread {guest_thread:?} completed");
3486                let state = store.0.concurrent_state_mut();
3487                if let Some(t) = old_thread.guest() {
3488                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3489                }
3490                log::trace!("thread start: restored {old_thread:?} as current thread");
3491
3492                Ok(())
3493            },
3494        );
3495
3496        let state = store.0.concurrent_state_mut();
3497        let current_thread = state.current_guest_thread()?;
3498        let parent_task = current_thread.task;
3499
3500        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3501        let thread_id = state.push(new_thread)?;
3502        state.get_mut(parent_task)?.threads.insert(thread_id);
3503
3504        log::trace!("new thread with id {thread_id:?} created");
3505
3506        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3507    }
3508
3509    pub(crate) fn resume_thread(
3510        self,
3511        store: &mut StoreOpaque,
3512        runtime_instance: RuntimeComponentInstanceIndex,
3513        thread_idx: u32,
3514        high_priority: bool,
3515        allow_ready: bool,
3516    ) -> Result<()> {
3517        let thread_id =
3518            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3519        let state = store.concurrent_state_mut();
3520        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3521        let thread = state.get_mut(guest_thread.thread)?;
3522
3523        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3524            GuestThreadState::NotStartedExplicit(start_func) => {
3525                log::trace!("starting thread {guest_thread:?}");
3526                let guest_call = WorkItem::GuestCall(
3527                    runtime_instance,
3528                    GuestCall {
3529                        thread: guest_thread,
3530                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3531                            start_func(store, guest_thread)
3532                        })),
3533                    },
3534                );
3535                store
3536                    .concurrent_state_mut()
3537                    .push_work_item(guest_call, high_priority);
3538            }
3539            GuestThreadState::Suspended(fiber) => {
3540                log::trace!("resuming thread {thread_id:?} that was suspended");
3541                store
3542                    .concurrent_state_mut()
3543                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3544            }
3545            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3546                log::trace!("resuming thread {thread_id:?} that was ready");
3547                thread.state = GuestThreadState::Ready { fiber, cancellable };
3548                store
3549                    .concurrent_state_mut()
3550                    .promote_thread_work_item(guest_thread);
3551            }
3552            other => {
3553                thread.state = other;
3554                bail!(Trap::CannotResumeThread);
3555            }
3556        }
3557        Ok(())
3558    }
3559
3560    fn add_guest_thread_to_instance_table(
3561        self,
3562        thread_id: TableId<GuestThread>,
3563        store: &mut StoreOpaque,
3564        runtime_instance: RuntimeComponentInstanceIndex,
3565    ) -> Result<u32> {
3566        let guest_id = store
3567            .instance_state(self.runtime_instance(runtime_instance))
3568            .thread_handle_table()
3569            .guest_thread_insert(thread_id.rep())?;
3570        store
3571            .concurrent_state_mut()
3572            .get_mut(thread_id)?
3573            .instance_rep = Some(guest_id);
3574        Ok(guest_id)
3575    }
3576
3577    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3578    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3579    pub(crate) fn suspension_intrinsic(
3580        self,
3581        store: &mut StoreOpaque,
3582        caller: RuntimeComponentInstanceIndex,
3583        cancellable: bool,
3584        yielding: bool,
3585        to_thread: SuspensionTarget,
3586    ) -> Result<WaitResult> {
3587        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3588        if to_thread.is_none() {
3589            let state = store.concurrent_state_mut();
3590            if yielding {
3591                // This is a `thread.yield` call
3592                if !state.may_block(guest_thread.task)? {
3593                    // In a non-blocking context, a `thread.yield` may trigger
3594                    // other threads in the same component instance to run.
3595                    if !state.promote_instance_local_thread_work_item(caller) {
3596                        // No other threads are runnable, so just return
3597                        return Ok(WaitResult::Completed);
3598                    }
3599                }
3600            } else {
3601                // The caller may only call `thread.suspend` from an async task
3602                // (i.e. a task created via a call to an async export).
3603                // Otherwise, we'll trap.
3604                store.check_blocking()?;
3605            }
3606        }
3607
3608        // There could be a pending cancellation from a previous uncancellable wait
3609        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3610            return Ok(WaitResult::Cancelled);
3611        }
3612
3613        match to_thread {
3614            SuspensionTarget::SomeSuspended(thread) => {
3615                self.resume_thread(store, caller, thread, true, false)?
3616            }
3617            SuspensionTarget::Some(thread) => {
3618                self.resume_thread(store, caller, thread, true, true)?
3619            }
3620            SuspensionTarget::None => { /* nothing to do */ }
3621        }
3622
3623        let reason = if yielding {
3624            SuspendReason::Yielding {
3625                thread: guest_thread,
3626                cancellable,
3627                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3628                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3629                // panic if we called it in a non-blocking context.
3630                skip_may_block_check: to_thread.is_some(),
3631            }
3632        } else {
3633            SuspendReason::ExplicitlySuspending {
3634                thread: guest_thread,
3635                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3636                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3637                // panic if we called it in a non-blocking context.
3638                skip_may_block_check: to_thread.is_some(),
3639            }
3640        };
3641
3642        store.suspend(reason)?;
3643
3644        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3645            Ok(WaitResult::Cancelled)
3646        } else {
3647            Ok(WaitResult::Completed)
3648        }
3649    }
3650
3651    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3652    fn waitable_check(
3653        self,
3654        store: &mut StoreOpaque,
3655        cancellable: bool,
3656        check: WaitableCheck,
3657        params: WaitableCheckParams,
3658    ) -> Result<u32> {
3659        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3660
3661        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3662
3663        let state = store.concurrent_state_mut();
3664        let task = state.get_mut(guest_thread.task)?;
3665
3666        // If we're waiting, and there are no events immediately available,
3667        // suspend the fiber until that changes.
3668        match &check {
3669            WaitableCheck::Wait => {
3670                let set = params.set;
3671
3672                if (task.event.is_none()
3673                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3674                    && state.get_mut(set)?.ready.is_empty()
3675                {
3676                    if cancellable {
3677                        let old = state
3678                            .get_mut(guest_thread.thread)?
3679                            .wake_on_cancel
3680                            .replace(set);
3681                        if !old.is_none() {
3682                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3683                        }
3684                    }
3685
3686                    store.suspend(SuspendReason::Waiting {
3687                        set,
3688                        thread: guest_thread,
3689                        skip_may_block_check: false,
3690                    })?;
3691                }
3692            }
3693            WaitableCheck::Poll => {}
3694        }
3695
3696        log::trace!(
3697            "waitable check for {guest_thread:?}; set {:?}, part two",
3698            params.set
3699        );
3700
3701        // Deliver any pending events to the guest and return.
3702        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3703
3704        let (ordinal, handle, result) = match &check {
3705            WaitableCheck::Wait => {
3706                let (event, waitable) = match event {
3707                    Some(p) => p,
3708                    None => bail_bug!("event expected to be present"),
3709                };
3710                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3711                let (ordinal, result) = event.parts();
3712                (ordinal, handle, result)
3713            }
3714            WaitableCheck::Poll => {
3715                if let Some((event, waitable)) = event {
3716                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3717                    let (ordinal, result) = event.parts();
3718                    (ordinal, handle, result)
3719                } else {
3720                    log::trace!(
3721                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3722                        guest_thread.task,
3723                        params.set
3724                    );
3725                    let (ordinal, result) = Event::None.parts();
3726                    (ordinal, 0, result)
3727                }
3728            }
3729        };
3730        let memory = self.options_memory_mut(store, params.options);
3731        let ptr = crate::component::func::validate_inbounds_dynamic(
3732            &CanonicalAbiInfo::POINTER_PAIR,
3733            memory,
3734            &ValRaw::u32(params.payload),
3735        )?;
3736        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3737        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3738        Ok(ordinal)
3739    }
3740
3741    /// Implements the `subtask.cancel` intrinsic.
3742    pub(crate) fn subtask_cancel(
3743        self,
3744        store: &mut StoreOpaque,
3745        caller_instance: RuntimeComponentInstanceIndex,
3746        async_: bool,
3747        task_id: u32,
3748    ) -> Result<u32> {
3749        if !async_ {
3750            // The caller may only sync call `subtask.cancel` from an async task
3751            // (i.e. a task created via a call to an async export).  Otherwise,
3752            // we'll trap.
3753            store.check_blocking()?;
3754        }
3755
3756        let (rep, is_host) = store
3757            .instance_state(self.runtime_instance(caller_instance))
3758            .handle_table()
3759            .subtask_rep(task_id)?;
3760        let waitable = if is_host {
3761            Waitable::Host(TableId::<HostTask>::new(rep))
3762        } else {
3763            Waitable::Guest(TableId::<GuestTask>::new(rep))
3764        };
3765        let concurrent_state = store.concurrent_state_mut();
3766
3767        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3768
3769        let needs_block;
3770        if let Waitable::Host(host_task) = waitable {
3771            let state = &mut concurrent_state.get_mut(host_task)?.state;
3772            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3773                // If the callee is still running, signal an abort is requested.
3774                //
3775                // After cancelling this falls through to block waiting for the
3776                // host task to actually finish assuming that `async_` is false.
3777                // This blocking behavior resolves the race of `handle.abort()`
3778                // with the task actually getting cancelled or finishing.
3779                HostTaskState::CalleeRunning(handle) => {
3780                    handle.abort();
3781                    needs_block = true;
3782                }
3783
3784                // Cancellation was already requested, so fail as the task can't
3785                // be cancelled twice.
3786                HostTaskState::CalleeDone { cancelled } => {
3787                    if cancelled {
3788                        bail!(Trap::SubtaskCancelAfterTerminal);
3789                    } else {
3790                        // The callee is already done so there's no need to
3791                        // block further for an event.
3792                        needs_block = false;
3793                    }
3794                }
3795
3796                // These states should not be possible for a subtask that's
3797                // visible from the guest, so trap here.
3798                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3799                    bail_bug!("invalid states for host callee")
3800                }
3801            }
3802        } else {
3803            let guest_task = TableId::<GuestTask>::new(rep);
3804            let task = concurrent_state.get_mut(guest_task)?;
3805            if !task.already_lowered_parameters() {
3806                store.cancel_guest_subtask_without_lowered_parameters(
3807                    self.runtime_instance(caller_instance),
3808                    guest_task,
3809                )?;
3810                return Ok(Status::StartCancelled as u32);
3811            } else if !task.returned_or_cancelled() {
3812                // Started, but not yet returned or cancelled; send the
3813                // `CANCELLED` event
3814                task.cancel_sent = true;
3815                // Note that this might overwrite an event that was set earlier
3816                // (e.g. `Event::None` if the task is yielding, or
3817                // `Event::Cancelled` if it was already cancelled), but that's
3818                // okay -- this should supersede the previous state.
3819                task.event = Some(Event::Cancelled);
3820                let runtime_instance = task.instance.index;
3821                for thread in task.threads.clone() {
3822                    let thread = QualifiedThreadId {
3823                        task: guest_task,
3824                        thread,
3825                    };
3826                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
3827                    if let Some(set) = thread_mut.wake_on_cancel.take() {
3828                        // The thread is in a cancellable wait, so wake it up:
3829                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3830                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3831                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3832                                runtime_instance,
3833                                GuestCall {
3834                                    thread,
3835                                    kind: GuestCallKind::DeliverEvent {
3836                                        instance,
3837                                        set: None,
3838                                    },
3839                                },
3840                            ),
3841                            None => bail_bug!("thread not present in wake_on_cancel set"),
3842                        };
3843                        concurrent_state.push_high_priority(item);
3844
3845                        let caller = concurrent_state.current_guest_thread()?;
3846                        store.suspend(SuspendReason::Yielding {
3847                            thread: caller,
3848                            cancellable: false,
3849                            // `subtask.cancel` is not allowed to be called in a
3850                            // sync context, so we cannot skip the may-block check.
3851                            skip_may_block_check: false,
3852                        })?;
3853                        break;
3854                    } else if let GuestThreadState::Ready {
3855                        cancellable: true, ..
3856                    } = &thread_mut.state
3857                    {
3858                        // The thread is in a cancellable yield, so yield back
3859                        // to it.
3860                        let caller = concurrent_state.current_guest_thread()?;
3861                        concurrent_state.promote_thread_work_item(thread);
3862                        store.suspend(SuspendReason::Yielding {
3863                            thread: caller,
3864                            cancellable: false,
3865                            skip_may_block_check: false,
3866                        })?;
3867                        break;
3868                    }
3869                }
3870
3871                // Guest tasks need to block if they have not yet returned or
3872                // cancelled, even as a result of the event delivery above.
3873                needs_block = !store
3874                    .concurrent_state_mut()
3875                    .get_mut(guest_task)?
3876                    .returned_or_cancelled()
3877            } else {
3878                needs_block = false;
3879            }
3880        };
3881
3882        // If we need to block waiting on the terminal status of this subtask
3883        // then return immediately in `async` mode, or otherwise wait for the
3884        // event to get signaled through the store.
3885        if needs_block {
3886            if async_ {
3887                return Ok(BLOCKED);
3888            }
3889
3890            // Wait for this waitable to get signaled with its terminal status
3891            // from the completion callback enqueued by `first_poll`. Once
3892            // that's done fall through to the sahred
3893            store.wait_for_event(waitable)?;
3894
3895            // .. fall through to determine what event's in store for us.
3896        }
3897
3898        let event = waitable.take_event(store.concurrent_state_mut())?;
3899        if let Some(Event::Subtask {
3900            status: status @ (Status::Returned | Status::ReturnCancelled),
3901        }) = event
3902        {
3903            Ok(status as u32)
3904        } else {
3905            bail!(Trap::SubtaskCancelAfterTerminal);
3906        }
3907    }
3908}
3909
3910/// Trait representing component model ABI async intrinsics and fused adapter
3911/// helper functions.
3912///
3913/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3914/// which must be valid for at least the duration of the call (and possibly for
3915/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3916/// pointers used for async calls).
3917pub trait VMComponentAsyncStore {
3918    /// A helper function for fused adapter modules involving calls where the
3919    /// one of the caller or callee is async.
3920    ///
3921    /// This helper is not used when the caller and callee both use the sync
3922    /// ABI, only when at least one is async is this used.
3923    unsafe fn prepare_call(
3924        &mut self,
3925        instance: Instance,
3926        memory: *mut VMMemoryDefinition,
3927        start: NonNull<VMFuncRef>,
3928        return_: NonNull<VMFuncRef>,
3929        caller_instance: RuntimeComponentInstanceIndex,
3930        callee_instance: RuntimeComponentInstanceIndex,
3931        task_return_type: TypeTupleIndex,
3932        callee_async: bool,
3933        string_encoding: StringEncoding,
3934        result_count: u32,
3935        storage: *mut ValRaw,
3936        storage_len: usize,
3937    ) -> Result<()>;
3938
3939    /// A helper function for fused adapter modules involving calls where the
3940    /// caller is sync-lowered but the callee is async-lifted.
3941    unsafe fn sync_start(
3942        &mut self,
3943        instance: Instance,
3944        callback: *mut VMFuncRef,
3945        callee: NonNull<VMFuncRef>,
3946        param_count: u32,
3947        storage: *mut MaybeUninit<ValRaw>,
3948        storage_len: usize,
3949    ) -> Result<()>;
3950
3951    /// A helper function for fused adapter modules involving calls where the
3952    /// caller is async-lowered.
3953    unsafe fn async_start(
3954        &mut self,
3955        instance: Instance,
3956        callback: *mut VMFuncRef,
3957        post_return: *mut VMFuncRef,
3958        callee: NonNull<VMFuncRef>,
3959        param_count: u32,
3960        result_count: u32,
3961        flags: u32,
3962    ) -> Result<u32>;
3963
3964    /// The `future.write` intrinsic.
3965    fn future_write(
3966        &mut self,
3967        instance: Instance,
3968        caller: RuntimeComponentInstanceIndex,
3969        ty: TypeFutureTableIndex,
3970        options: OptionsIndex,
3971        future: u32,
3972        address: u32,
3973    ) -> Result<u32>;
3974
3975    /// The `future.read` intrinsic.
3976    fn future_read(
3977        &mut self,
3978        instance: Instance,
3979        caller: RuntimeComponentInstanceIndex,
3980        ty: TypeFutureTableIndex,
3981        options: OptionsIndex,
3982        future: u32,
3983        address: u32,
3984    ) -> Result<u32>;
3985
3986    /// The `future.drop-writable` intrinsic.
3987    fn future_drop_writable(
3988        &mut self,
3989        instance: Instance,
3990        ty: TypeFutureTableIndex,
3991        writer: u32,
3992    ) -> Result<()>;
3993
3994    /// The `stream.write` intrinsic.
3995    fn stream_write(
3996        &mut self,
3997        instance: Instance,
3998        caller: RuntimeComponentInstanceIndex,
3999        ty: TypeStreamTableIndex,
4000        options: OptionsIndex,
4001        stream: u32,
4002        address: u32,
4003        count: u32,
4004    ) -> Result<u32>;
4005
4006    /// The `stream.read` intrinsic.
4007    fn stream_read(
4008        &mut self,
4009        instance: Instance,
4010        caller: RuntimeComponentInstanceIndex,
4011        ty: TypeStreamTableIndex,
4012        options: OptionsIndex,
4013        stream: u32,
4014        address: u32,
4015        count: u32,
4016    ) -> Result<u32>;
4017
4018    /// The "fast-path" implementation of the `stream.write` intrinsic for
4019    /// "flat" (i.e. memcpy-able) payloads.
4020    fn flat_stream_write(
4021        &mut self,
4022        instance: Instance,
4023        caller: RuntimeComponentInstanceIndex,
4024        ty: TypeStreamTableIndex,
4025        options: OptionsIndex,
4026        payload_size: u32,
4027        payload_align: u32,
4028        stream: u32,
4029        address: u32,
4030        count: u32,
4031    ) -> Result<u32>;
4032
4033    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4034    /// (i.e. memcpy-able) payloads.
4035    fn flat_stream_read(
4036        &mut self,
4037        instance: Instance,
4038        caller: RuntimeComponentInstanceIndex,
4039        ty: TypeStreamTableIndex,
4040        options: OptionsIndex,
4041        payload_size: u32,
4042        payload_align: u32,
4043        stream: u32,
4044        address: u32,
4045        count: u32,
4046    ) -> Result<u32>;
4047
4048    /// The `stream.drop-writable` intrinsic.
4049    fn stream_drop_writable(
4050        &mut self,
4051        instance: Instance,
4052        ty: TypeStreamTableIndex,
4053        writer: u32,
4054    ) -> Result<()>;
4055
4056    /// The `error-context.debug-message` intrinsic.
4057    fn error_context_debug_message(
4058        &mut self,
4059        instance: Instance,
4060        ty: TypeComponentLocalErrorContextTableIndex,
4061        options: OptionsIndex,
4062        err_ctx_handle: u32,
4063        debug_msg_address: u32,
4064    ) -> Result<()>;
4065
4066    /// The `thread.new-indirect` intrinsic
4067    fn thread_new_indirect(
4068        &mut self,
4069        instance: Instance,
4070        caller: RuntimeComponentInstanceIndex,
4071        func_ty_idx: TypeFuncIndex,
4072        start_func_table_idx: RuntimeTableIndex,
4073        start_func_idx: u32,
4074        context: i32,
4075    ) -> Result<u32>;
4076}
4077
4078/// SAFETY: See trait docs.
4079impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4080    unsafe fn prepare_call(
4081        &mut self,
4082        instance: Instance,
4083        memory: *mut VMMemoryDefinition,
4084        start: NonNull<VMFuncRef>,
4085        return_: NonNull<VMFuncRef>,
4086        caller_instance: RuntimeComponentInstanceIndex,
4087        callee_instance: RuntimeComponentInstanceIndex,
4088        task_return_type: TypeTupleIndex,
4089        callee_async: bool,
4090        string_encoding: StringEncoding,
4091        result_count_or_max_if_async: u32,
4092        storage: *mut ValRaw,
4093        storage_len: usize,
4094    ) -> Result<()> {
4095        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4096        // this method will have ensured that `storage` is a valid
4097        // pointer containing at least `storage_len` items.
4098        let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4099
4100        unsafe {
4101            instance.prepare_call(
4102                StoreContextMut(self),
4103                start,
4104                return_,
4105                caller_instance,
4106                callee_instance,
4107                task_return_type,
4108                callee_async,
4109                memory,
4110                string_encoding,
4111                match result_count_or_max_if_async {
4112                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4113                        params,
4114                        has_result: false,
4115                    },
4116                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4117                        params,
4118                        has_result: true,
4119                    },
4120                    result_count => CallerInfo::Sync {
4121                        params,
4122                        result_count,
4123                    },
4124                },
4125            )
4126        }
4127    }
4128
4129    unsafe fn sync_start(
4130        &mut self,
4131        instance: Instance,
4132        callback: *mut VMFuncRef,
4133        callee: NonNull<VMFuncRef>,
4134        param_count: u32,
4135        storage: *mut MaybeUninit<ValRaw>,
4136        storage_len: usize,
4137    ) -> Result<()> {
4138        unsafe {
4139            instance
4140                .start_call(
4141                    StoreContextMut(self),
4142                    callback,
4143                    ptr::null_mut(),
4144                    callee,
4145                    param_count,
4146                    1,
4147                    START_FLAG_ASYNC_CALLEE,
4148                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4149                    // this method will have ensured that `storage` is a valid
4150                    // pointer containing at least `storage_len` items.
4151                    Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4152                )
4153                .map(drop)
4154        }
4155    }
4156
4157    unsafe fn async_start(
4158        &mut self,
4159        instance: Instance,
4160        callback: *mut VMFuncRef,
4161        post_return: *mut VMFuncRef,
4162        callee: NonNull<VMFuncRef>,
4163        param_count: u32,
4164        result_count: u32,
4165        flags: u32,
4166    ) -> Result<u32> {
4167        unsafe {
4168            instance.start_call(
4169                StoreContextMut(self),
4170                callback,
4171                post_return,
4172                callee,
4173                param_count,
4174                result_count,
4175                flags,
4176                None,
4177            )
4178        }
4179    }
4180
4181    fn future_write(
4182        &mut self,
4183        instance: Instance,
4184        caller: RuntimeComponentInstanceIndex,
4185        ty: TypeFutureTableIndex,
4186        options: OptionsIndex,
4187        future: u32,
4188        address: u32,
4189    ) -> Result<u32> {
4190        instance
4191            .guest_write(
4192                StoreContextMut(self),
4193                caller,
4194                TransmitIndex::Future(ty),
4195                options,
4196                None,
4197                future,
4198                address,
4199                1,
4200            )
4201            .map(|result| result.encode())
4202    }
4203
4204    fn future_read(
4205        &mut self,
4206        instance: Instance,
4207        caller: RuntimeComponentInstanceIndex,
4208        ty: TypeFutureTableIndex,
4209        options: OptionsIndex,
4210        future: u32,
4211        address: u32,
4212    ) -> Result<u32> {
4213        instance
4214            .guest_read(
4215                StoreContextMut(self),
4216                caller,
4217                TransmitIndex::Future(ty),
4218                options,
4219                None,
4220                future,
4221                address,
4222                1,
4223            )
4224            .map(|result| result.encode())
4225    }
4226
4227    fn stream_write(
4228        &mut self,
4229        instance: Instance,
4230        caller: RuntimeComponentInstanceIndex,
4231        ty: TypeStreamTableIndex,
4232        options: OptionsIndex,
4233        stream: u32,
4234        address: u32,
4235        count: u32,
4236    ) -> Result<u32> {
4237        instance
4238            .guest_write(
4239                StoreContextMut(self),
4240                caller,
4241                TransmitIndex::Stream(ty),
4242                options,
4243                None,
4244                stream,
4245                address,
4246                count,
4247            )
4248            .map(|result| result.encode())
4249    }
4250
4251    fn stream_read(
4252        &mut self,
4253        instance: Instance,
4254        caller: RuntimeComponentInstanceIndex,
4255        ty: TypeStreamTableIndex,
4256        options: OptionsIndex,
4257        stream: u32,
4258        address: u32,
4259        count: u32,
4260    ) -> Result<u32> {
4261        instance
4262            .guest_read(
4263                StoreContextMut(self),
4264                caller,
4265                TransmitIndex::Stream(ty),
4266                options,
4267                None,
4268                stream,
4269                address,
4270                count,
4271            )
4272            .map(|result| result.encode())
4273    }
4274
4275    fn future_drop_writable(
4276        &mut self,
4277        instance: Instance,
4278        ty: TypeFutureTableIndex,
4279        writer: u32,
4280    ) -> Result<()> {
4281        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4282    }
4283
4284    fn flat_stream_write(
4285        &mut self,
4286        instance: Instance,
4287        caller: RuntimeComponentInstanceIndex,
4288        ty: TypeStreamTableIndex,
4289        options: OptionsIndex,
4290        payload_size: u32,
4291        payload_align: u32,
4292        stream: u32,
4293        address: u32,
4294        count: u32,
4295    ) -> Result<u32> {
4296        instance
4297            .guest_write(
4298                StoreContextMut(self),
4299                caller,
4300                TransmitIndex::Stream(ty),
4301                options,
4302                Some(FlatAbi {
4303                    size: payload_size,
4304                    align: payload_align,
4305                }),
4306                stream,
4307                address,
4308                count,
4309            )
4310            .map(|result| result.encode())
4311    }
4312
4313    fn flat_stream_read(
4314        &mut self,
4315        instance: Instance,
4316        caller: RuntimeComponentInstanceIndex,
4317        ty: TypeStreamTableIndex,
4318        options: OptionsIndex,
4319        payload_size: u32,
4320        payload_align: u32,
4321        stream: u32,
4322        address: u32,
4323        count: u32,
4324    ) -> Result<u32> {
4325        instance
4326            .guest_read(
4327                StoreContextMut(self),
4328                caller,
4329                TransmitIndex::Stream(ty),
4330                options,
4331                Some(FlatAbi {
4332                    size: payload_size,
4333                    align: payload_align,
4334                }),
4335                stream,
4336                address,
4337                count,
4338            )
4339            .map(|result| result.encode())
4340    }
4341
4342    fn stream_drop_writable(
4343        &mut self,
4344        instance: Instance,
4345        ty: TypeStreamTableIndex,
4346        writer: u32,
4347    ) -> Result<()> {
4348        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4349    }
4350
4351    fn error_context_debug_message(
4352        &mut self,
4353        instance: Instance,
4354        ty: TypeComponentLocalErrorContextTableIndex,
4355        options: OptionsIndex,
4356        err_ctx_handle: u32,
4357        debug_msg_address: u32,
4358    ) -> Result<()> {
4359        instance.error_context_debug_message(
4360            StoreContextMut(self),
4361            ty,
4362            options,
4363            err_ctx_handle,
4364            debug_msg_address,
4365        )
4366    }
4367
4368    fn thread_new_indirect(
4369        &mut self,
4370        instance: Instance,
4371        caller: RuntimeComponentInstanceIndex,
4372        func_ty_idx: TypeFuncIndex,
4373        start_func_table_idx: RuntimeTableIndex,
4374        start_func_idx: u32,
4375        context: i32,
4376    ) -> Result<u32> {
4377        instance.thread_new_indirect(
4378            StoreContextMut(self),
4379            caller,
4380            func_ty_idx,
4381            start_func_table_idx,
4382            start_func_idx,
4383            context,
4384        )
4385    }
4386}
4387
4388type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4389
4390/// Represents the state of a pending host task.
4391///
4392/// This is used to represent tasks when the guest calls into the host.
4393pub(crate) struct HostTask {
4394    common: WaitableCommon,
4395
4396    /// Guest thread which called the host.
4397    caller: QualifiedThreadId,
4398
4399    /// State of borrows/etc the host needs to track. Used when the guest passes
4400    /// borrows to the host, for example.
4401    call_context: CallContext,
4402
4403    state: HostTaskState,
4404}
4405
4406enum HostTaskState {
4407    /// A host task has been created and it's considered "started".
4408    ///
4409    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4410    /// is where this will get updated further.
4411    CalleeStarted,
4412
4413    /// State used for tasks in `first_poll` meaning that the guest did an async
4414    /// lower of a host async function which is blocked. The specified handle is
4415    /// linked to the future in the main `FuturesUnordered` of a store which is
4416    /// used to cancel it if the guest requests cancellation.
4417    CalleeRunning(JoinHandle),
4418
4419    /// Terminal state used for tasks in `poll_and_block` to store the result of
4420    /// their computation. Note that this state is not used for tasks in
4421    /// `first_poll`.
4422    CalleeFinished(LiftedResult),
4423
4424    /// Terminal state for host tasks meaning that the task was cancelled or the
4425    /// result was taken.
4426    CalleeDone { cancelled: bool },
4427}
4428
4429impl HostTask {
4430    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4431        Self {
4432            common: WaitableCommon::default(),
4433            call_context: CallContext::default(),
4434            caller,
4435            state,
4436        }
4437    }
4438}
4439
4440impl TableDebug for HostTask {
4441    fn type_name() -> &'static str {
4442        "HostTask"
4443    }
4444}
4445
4446type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4447
4448/// Represents the caller of a given guest task.
4449enum Caller {
4450    /// The host called the guest task.
4451    Host {
4452        /// If present, may be used to deliver the result.
4453        tx: Option<oneshot::Sender<LiftedResult>>,
4454        /// If true, there's a host future that must be dropped before the task
4455        /// can be deleted.
4456        host_future_present: bool,
4457        /// Represents the caller of the host function which called back into a
4458        /// guest. Note that this thread could belong to an entirely unrelated
4459        /// top-level component instance than the one the host called into.
4460        caller: CurrentThread,
4461    },
4462    /// Another guest thread called the guest task
4463    Guest {
4464        /// The id of the caller
4465        thread: QualifiedThreadId,
4466    },
4467}
4468
4469/// Represents a closure and related canonical ABI parameters required to
4470/// validate a `task.return` call at runtime and lift the result.
4471struct LiftResult {
4472    lift: RawLift,
4473    ty: TypeTupleIndex,
4474    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4475    string_encoding: StringEncoding,
4476}
4477
4478/// The table ID for a guest thread, qualified by the task to which it belongs.
4479///
4480/// This exists to minimize table lookups and the necessity to pass stores around mutably
4481/// for the common case of identifying the task to which a thread belongs.
4482#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4483pub(crate) struct QualifiedThreadId {
4484    task: TableId<GuestTask>,
4485    thread: TableId<GuestThread>,
4486}
4487
4488impl QualifiedThreadId {
4489    fn qualify(
4490        state: &mut ConcurrentState,
4491        thread: TableId<GuestThread>,
4492    ) -> Result<QualifiedThreadId> {
4493        Ok(QualifiedThreadId {
4494            task: state.get_mut(thread)?.parent_task,
4495            thread,
4496        })
4497    }
4498}
4499
4500impl fmt::Debug for QualifiedThreadId {
4501    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4502        f.debug_tuple("QualifiedThreadId")
4503            .field(&self.task.rep())
4504            .field(&self.thread.rep())
4505            .finish()
4506    }
4507}
4508
4509enum GuestThreadState {
4510    NotStartedImplicit,
4511    NotStartedExplicit(
4512        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4513    ),
4514    Running,
4515    Suspended(StoreFiber<'static>),
4516    Ready {
4517        fiber: StoreFiber<'static>,
4518        cancellable: bool,
4519    },
4520    Completed,
4521}
4522pub struct GuestThread {
4523    /// Context-local state used to implement the `context.{get,set}`
4524    /// intrinsics.
4525    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4526    /// The owning guest task.
4527    parent_task: TableId<GuestTask>,
4528    /// If present, indicates that the thread is currently waiting on the
4529    /// specified set but may be cancelled and woken immediately.
4530    wake_on_cancel: Option<TableId<WaitableSet>>,
4531    /// The execution state of this guest thread
4532    state: GuestThreadState,
4533    /// The index of this thread in the component instance's handle table.
4534    /// This must always be `Some` after initialization.
4535    instance_rep: Option<u32>,
4536    /// Scratch waitable set used to watch subtasks during synchronous calls.
4537    sync_call_set: TableId<WaitableSet>,
4538}
4539
4540impl GuestThread {
4541    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4542    /// handle.
4543    fn from_instance(
4544        state: Pin<&mut ComponentInstance>,
4545        caller_instance: RuntimeComponentInstanceIndex,
4546        guest_thread: u32,
4547    ) -> Result<TableId<Self>> {
4548        let rep = state.instance_states().0[caller_instance]
4549            .thread_handle_table()
4550            .guest_thread_rep(guest_thread)?;
4551        Ok(TableId::new(rep))
4552    }
4553
4554    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4555        let sync_call_set = state.push(WaitableSet {
4556            is_sync_call_set: true,
4557            ..WaitableSet::default()
4558        })?;
4559        Ok(Self {
4560            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4561            parent_task,
4562            wake_on_cancel: None,
4563            state: GuestThreadState::NotStartedImplicit,
4564            instance_rep: None,
4565            sync_call_set,
4566        })
4567    }
4568
4569    fn new_explicit(
4570        state: &mut ConcurrentState,
4571        parent_task: TableId<GuestTask>,
4572        start_func: Box<
4573            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4574        >,
4575    ) -> Result<Self> {
4576        let sync_call_set = state.push(WaitableSet {
4577            is_sync_call_set: true,
4578            ..WaitableSet::default()
4579        })?;
4580        Ok(Self {
4581            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4582            parent_task,
4583            wake_on_cancel: None,
4584            state: GuestThreadState::NotStartedExplicit(start_func),
4585            instance_rep: None,
4586            sync_call_set,
4587        })
4588    }
4589}
4590
4591impl TableDebug for GuestThread {
4592    fn type_name() -> &'static str {
4593        "GuestThread"
4594    }
4595}
4596
4597enum SyncResult {
4598    NotProduced,
4599    Produced(Option<ValRaw>),
4600    Taken,
4601}
4602
4603impl SyncResult {
4604    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4605        Ok(match mem::replace(self, SyncResult::Taken) {
4606            SyncResult::NotProduced => None,
4607            SyncResult::Produced(val) => Some(val),
4608            SyncResult::Taken => {
4609                bail_bug!("attempted to take a synchronous result that was already taken")
4610            }
4611        })
4612    }
4613}
4614
4615#[derive(Debug)]
4616enum HostFutureState {
4617    NotApplicable,
4618    Live,
4619    Dropped,
4620}
4621
4622/// Represents a pending guest task.
4623pub(crate) struct GuestTask {
4624    /// See `WaitableCommon`
4625    common: WaitableCommon,
4626    /// Closure to lower the parameters passed to this task.
4627    lower_params: Option<RawLower>,
4628    /// See `LiftResult`
4629    lift_result: Option<LiftResult>,
4630    /// A place to stash the type-erased lifted result if it can't be delivered
4631    /// immediately.
4632    result: Option<LiftedResult>,
4633    /// Closure to call the callback function for an async-lifted export, if
4634    /// provided.
4635    callback: Option<CallbackFn>,
4636    /// See `Caller`
4637    caller: Caller,
4638    /// Borrow state for this task.
4639    ///
4640    /// Keeps track of `borrow<T>` received to this task to ensure that
4641    /// everything is dropped by the time it exits.
4642    call_context: CallContext,
4643    /// A place to stash the lowered result for a sync-to-async call until it
4644    /// can be returned to the caller.
4645    sync_result: SyncResult,
4646    /// Whether or not the task has been cancelled (i.e. whether the task is
4647    /// permitted to call `task.cancel`).
4648    cancel_sent: bool,
4649    /// Whether or not we've sent a `Status::Starting` event to any current or
4650    /// future waiters for this waitable.
4651    starting_sent: bool,
4652    /// The runtime instance to which the exported function for this guest task
4653    /// belongs.
4654    ///
4655    /// Note that the task may do a sync->sync call via a fused adapter which
4656    /// results in that task executing code in a different instance, and it may
4657    /// call host functions and intrinsics from that other instance.
4658    instance: RuntimeInstance,
4659    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4660    /// delivered to this task.
4661    event: Option<Event>,
4662    /// Whether or not the task has exited.
4663    exited: bool,
4664    /// Threads belonging to this task
4665    threads: HashSet<TableId<GuestThread>>,
4666    /// The state of the host future that represents an async task, which must
4667    /// be dropped before we can delete the task.
4668    host_future_state: HostFutureState,
4669    /// Indicates whether this task was created for a call to an async-lifted
4670    /// export.
4671    async_function: bool,
4672
4673    decremented_interesting_task_count: bool,
4674}
4675
4676impl GuestTask {
4677    fn already_lowered_parameters(&self) -> bool {
4678        // We reset `lower_params` after we lower the parameters
4679        self.lower_params.is_none()
4680    }
4681
4682    fn returned_or_cancelled(&self) -> bool {
4683        // We reset `lift_result` after we return or exit
4684        self.lift_result.is_none()
4685    }
4686
4687    fn ready_to_delete(&self) -> bool {
4688        let threads_completed = self.threads.is_empty();
4689        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4690        let pending_completion_event = matches!(
4691            self.common.event,
4692            Some(Event::Subtask {
4693                status: Status::Returned | Status::ReturnCancelled
4694            })
4695        );
4696        let ready = threads_completed
4697            && !has_sync_result
4698            && !pending_completion_event
4699            && !matches!(self.host_future_state, HostFutureState::Live);
4700        log::trace!(
4701            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4702            threads_completed,
4703            has_sync_result,
4704            pending_completion_event,
4705            self.host_future_state
4706        );
4707        ready
4708    }
4709
4710    fn new(
4711        state: &mut ConcurrentState,
4712        lower_params: RawLower,
4713        lift_result: LiftResult,
4714        caller: Caller,
4715        callback: Option<CallbackFn>,
4716        instance: RuntimeInstance,
4717        async_function: bool,
4718    ) -> Result<QualifiedThreadId> {
4719        let host_future_state = match &caller {
4720            Caller::Guest { .. } => HostFutureState::NotApplicable,
4721            Caller::Host {
4722                host_future_present,
4723                ..
4724            } => {
4725                if *host_future_present {
4726                    HostFutureState::Live
4727                } else {
4728                    HostFutureState::NotApplicable
4729                }
4730            }
4731        };
4732        let task = state.push(Self {
4733            common: WaitableCommon::default(),
4734            lower_params: Some(lower_params),
4735            lift_result: Some(lift_result),
4736            result: None,
4737            callback,
4738            caller,
4739            call_context: CallContext::default(),
4740            sync_result: SyncResult::NotProduced,
4741            cancel_sent: false,
4742            starting_sent: false,
4743            instance,
4744            event: None,
4745            exited: false,
4746            threads: HashSet::new(),
4747            host_future_state,
4748            async_function,
4749            decremented_interesting_task_count: false,
4750        })?;
4751        let new_thread = GuestThread::new_implicit(state, task)?;
4752        let thread = state.push(new_thread)?;
4753        state.get_mut(task)?.threads.insert(thread);
4754        state.interesting_tasks += 1;
4755        Ok(QualifiedThreadId { task, thread })
4756    }
4757}
4758
4759impl TableDebug for GuestTask {
4760    fn type_name() -> &'static str {
4761        "GuestTask"
4762    }
4763}
4764
4765/// Represents state common to all kinds of waitables.
4766#[derive(Default)]
4767struct WaitableCommon {
4768    /// The currently pending event for this waitable, if any.
4769    event: Option<Event>,
4770    /// The set to which this waitable belongs, if any.
4771    set: Option<TableId<WaitableSet>>,
4772    /// The handle with which the guest refers to this waitable, if any.
4773    handle: Option<u32>,
4774}
4775
4776/// Represents a Component Model Async `waitable`.
4777#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4778enum Waitable {
4779    /// A host task
4780    Host(TableId<HostTask>),
4781    /// A guest task
4782    Guest(TableId<GuestTask>),
4783    /// The read or write end of a stream or future
4784    Transmit(TableId<TransmitHandle>),
4785}
4786
4787impl Waitable {
4788    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4789    /// handle.
4790    fn from_instance(
4791        state: Pin<&mut ComponentInstance>,
4792        caller_instance: RuntimeComponentInstanceIndex,
4793        waitable: u32,
4794    ) -> Result<Self> {
4795        use crate::runtime::vm::component::Waitable;
4796
4797        let (waitable, kind) = state.instance_states().0[caller_instance]
4798            .handle_table()
4799            .waitable_rep(waitable)?;
4800
4801        Ok(match kind {
4802            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4803            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4804            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4805        })
4806    }
4807
4808    /// Retrieve the host-visible identifier for this `Waitable`.
4809    fn rep(&self) -> u32 {
4810        match self {
4811            Self::Host(id) => id.rep(),
4812            Self::Guest(id) => id.rep(),
4813            Self::Transmit(id) => id.rep(),
4814        }
4815    }
4816
4817    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4818    /// remove it from any set it may currently belong to (when `set` is
4819    /// `None`).
4820    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4821        log::trace!("waitable {self:?} join set {set:?}");
4822
4823        let old = mem::replace(&mut self.common(state)?.set, set);
4824
4825        if let Some(old) = old {
4826            match *self {
4827                Waitable::Host(id) => state.remove_child(id, old),
4828                Waitable::Guest(id) => state.remove_child(id, old),
4829                Waitable::Transmit(id) => state.remove_child(id, old),
4830            }?;
4831
4832            state.get_mut(old)?.ready.remove(self);
4833        }
4834
4835        if let Some(set) = set {
4836            match *self {
4837                Waitable::Host(id) => state.add_child(id, set),
4838                Waitable::Guest(id) => state.add_child(id, set),
4839                Waitable::Transmit(id) => state.add_child(id, set),
4840            }?;
4841
4842            if self.common(state)?.event.is_some() {
4843                self.mark_ready(state)?;
4844            }
4845        }
4846
4847        Ok(())
4848    }
4849
4850    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4851    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4852        Ok(match self {
4853            Self::Host(id) => &mut state.get_mut(*id)?.common,
4854            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4855            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4856        })
4857    }
4858
4859    /// Set or clear the pending event for this waitable and either deliver it
4860    /// to the first waiter, if any, or mark it as ready to be delivered to the
4861    /// next waiter that arrives.
4862    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4863        log::trace!("set event for {self:?}: {event:?}");
4864        self.common(state)?.event = event;
4865        self.mark_ready(state)
4866    }
4867
4868    /// Take the pending event from this waitable, leaving `None` in its place.
4869    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4870        let common = self.common(state)?;
4871        let event = common.event.take();
4872        if let Some(set) = self.common(state)?.set {
4873            state.get_mut(set)?.ready.remove(self);
4874        }
4875
4876        Ok(event)
4877    }
4878
4879    /// Deliver the current event for this waitable to the first waiter, if any,
4880    /// or else mark it as ready to be delivered to the next waiter that
4881    /// arrives.
4882    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4883        if let Some(set) = self.common(state)?.set {
4884            state.get_mut(set)?.ready.insert(*self);
4885            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4886                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4887                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4888
4889                let item = match mode {
4890                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4891                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4892                        state.get_mut(thread.task)?.instance.index,
4893                        GuestCall {
4894                            thread,
4895                            kind: GuestCallKind::DeliverEvent {
4896                                instance,
4897                                set: Some(set),
4898                            },
4899                        },
4900                    ),
4901                };
4902                state.push_high_priority(item);
4903            }
4904        }
4905        Ok(())
4906    }
4907
4908    /// Remove this waitable from the instance's rep table.
4909    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4910        match self {
4911            Self::Host(task) => {
4912                log::trace!("delete host task {task:?}");
4913                state.delete(*task)?;
4914            }
4915            Self::Guest(task) => {
4916                log::trace!("delete guest task {task:?}");
4917                let task = state.delete(*task)?;
4918
4919                // When a guest task is created it increments the
4920                // `ConcurrentState::interesting_tasks` counter, and that needs
4921                // to be paired with a decrement. There are a few situations in
4922                // which the decrement needs to happen which don't all funnel
4923                // through here, so in lieu of that at least try to catch issues
4924                // where we forgot to do a decrement.
4925                debug_assert!(task.decremented_interesting_task_count);
4926            }
4927            Self::Transmit(task) => {
4928                state.delete(*task)?;
4929            }
4930        }
4931
4932        Ok(())
4933    }
4934}
4935
4936impl fmt::Debug for Waitable {
4937    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4938        match self {
4939            Self::Host(id) => write!(f, "{id:?}"),
4940            Self::Guest(id) => write!(f, "{id:?}"),
4941            Self::Transmit(id) => write!(f, "{id:?}"),
4942        }
4943    }
4944}
4945
4946/// Represents a Component Model Async `waitable-set`.
4947#[derive(Default)]
4948struct WaitableSet {
4949    /// Which waitables in this set have pending events, if any.
4950    ready: BTreeSet<Waitable>,
4951    /// Which guest threads are currently waiting on this set, if any.
4952    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4953    /// Whether this set is a synthetic, internal one meant for handling
4954    /// synchronous calls.
4955    is_sync_call_set: bool,
4956}
4957
4958impl TableDebug for WaitableSet {
4959    fn type_name() -> &'static str {
4960        "WaitableSet"
4961    }
4962}
4963
4964/// Type-erased closure to lower the parameters for a guest task.
4965type RawLower =
4966    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4967
4968/// Type-erased closure to lift the result for a guest task.
4969type RawLift = Box<
4970    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4971>;
4972
4973/// Type erased result of a guest task which may be downcast to the expected
4974/// type by a host caller (or simply ignored in the case of a guest caller; see
4975/// `DummyResult`).
4976type LiftedResult = Box<dyn Any + Send + Sync>;
4977
4978/// Used to return a result from a `LiftFn` when the actual result has already
4979/// been lowered to a guest task's stack and linear memory.
4980struct DummyResult;
4981
4982/// Represents the Component Model Async state of a (sub-)component instance.
4983#[derive(Default)]
4984pub struct ConcurrentInstanceState {
4985    /// Whether backpressure is set for this instance (enabled if >0)
4986    backpressure: u16,
4987    /// Whether this instance can be entered
4988    do_not_enter: bool,
4989    /// Pending calls for this instance which require `Self::backpressure` to be
4990    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4991    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4992}
4993
4994impl ConcurrentInstanceState {
4995    pub fn pending_is_empty(&self) -> bool {
4996        self.pending.is_empty()
4997    }
4998}
4999
5000#[derive(Debug, Copy, Clone)]
5001pub(crate) enum CurrentThread {
5002    Guest(QualifiedThreadId),
5003    Host(TableId<HostTask>),
5004    None,
5005}
5006
5007impl CurrentThread {
5008    fn guest(&self) -> Option<&QualifiedThreadId> {
5009        match self {
5010            Self::Guest(id) => Some(id),
5011            _ => None,
5012        }
5013    }
5014
5015    fn host(&self) -> Option<TableId<HostTask>> {
5016        match self {
5017            Self::Host(id) => Some(*id),
5018            _ => None,
5019        }
5020    }
5021
5022    fn is_none(&self) -> bool {
5023        matches!(self, Self::None)
5024    }
5025}
5026
5027impl From<QualifiedThreadId> for CurrentThread {
5028    fn from(id: QualifiedThreadId) -> Self {
5029        Self::Guest(id)
5030    }
5031}
5032
5033impl From<TableId<HostTask>> for CurrentThread {
5034    fn from(id: TableId<HostTask>) -> Self {
5035        Self::Host(id)
5036    }
5037}
5038
5039/// Represents the Component Model Async state of a store.
5040pub struct ConcurrentState {
5041    /// The currently running thread, if any.
5042    current_thread: CurrentThread,
5043
5044    /// The set of pending host and background tasks, if any.
5045    ///
5046    /// See `ComponentInstance::poll_until` for where we temporarily take this
5047    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5048    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5049    /// The table of waitables, waitable sets, etc.
5050    table: AlwaysMut<ResourceTable>,
5051    /// The "high priority" work queue for this store's event loop.
5052    high_priority: Vec<WorkItem>,
5053    /// The "low priority" work queue for this store's event loop.
5054    low_priority: VecDeque<WorkItem>,
5055    /// A place to stash the reason a fiber is suspending so that the code which
5056    /// resumed it will know under what conditions the fiber should be resumed
5057    /// again.
5058    suspend_reason: Option<SuspendReason>,
5059    /// A cached fiber which is waiting for work to do.
5060    ///
5061    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5062    worker: Option<StoreFiber<'static>>,
5063    /// A place to stash the work item for which we're resuming a worker fiber.
5064    worker_item: Option<WorkerItem>,
5065
5066    /// Reference counts for all component error contexts
5067    ///
5068    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5069    /// (sub)component ref counts as tracked by `error_context_tables`, for
5070    /// example when the host holds one or more references to error contexts.
5071    ///
5072    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5073    /// component-wide representation) of the index into concurrent state for a given
5074    /// stored `ErrorContext`.
5075    ///
5076    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5077    /// as a `TableId<ErrorContextState>`.
5078    global_error_context_ref_counts:
5079        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5080
5081    /// The number of "interesting tasks" currently executing in the store.
5082    ///
5083    /// This tracks the concept of a component instance lifetime as defined in
5084    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5085    /// all tasks currently increment this counter which then gets decremented
5086    /// when they exit. In the future some tasks might not increment this
5087    /// counter, but for now all do.
5088    ///
5089    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5090    /// inform the embedder when all tasks have completed. This is then
5091    /// used in wasmtime-wasi-http, for example, to know when an instance is
5092    /// idle.
5093    interesting_tasks: usize,
5094
5095    /// Single waker to notify when `interesting_tasks` reaches 0.
5096    ///
5097    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5098    interesting_tasks_empty_waker: Option<Waker>,
5099}
5100
5101impl Default for ConcurrentState {
5102    fn default() -> Self {
5103        Self {
5104            current_thread: CurrentThread::None,
5105            table: AlwaysMut::new(ResourceTable::new()),
5106            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5107            high_priority: Vec::new(),
5108            low_priority: VecDeque::new(),
5109            suspend_reason: None,
5110            worker: None,
5111            worker_item: None,
5112            global_error_context_ref_counts: BTreeMap::new(),
5113            interesting_tasks: 0,
5114            interesting_tasks_empty_waker: None,
5115        }
5116    }
5117}
5118
5119impl ConcurrentState {
5120    /// Take ownership of any fibers and futures owned by this object.
5121    ///
5122    /// This should be used when disposing of the `Store` containing this object
5123    /// in order to gracefully resolve any and all fibers using
5124    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5125    /// use-after-free bugs due to fibers which may still have access to the
5126    /// `Store`.
5127    ///
5128    /// Additionally, the futures collected with this function should be dropped
5129    /// within a `tls::set` call, which will ensure than any futures closing
5130    /// over an `&Accessor` will have access to the store when dropped, allowing
5131    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5132    /// panicking.
5133    ///
5134    /// Note that this will leave the object in an inconsistent and unusable
5135    /// state, so it should only be used just prior to dropping it.
5136    pub(crate) fn take_fibers_and_futures(
5137        &mut self,
5138        fibers: &mut Vec<StoreFiber<'static>>,
5139        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5140    ) {
5141        for entry in self.table.get_mut().iter_mut() {
5142            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5143                for mode in mem::take(&mut set.waiting).into_values() {
5144                    if let WaitMode::Fiber(fiber) = mode {
5145                        fibers.push(fiber);
5146                    }
5147                }
5148            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5149                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5150                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5151                {
5152                    fibers.push(fiber);
5153                }
5154            }
5155        }
5156
5157        if let Some(fiber) = self.worker.take() {
5158            fibers.push(fiber);
5159        }
5160
5161        let mut handle_item = |item| match item {
5162            WorkItem::ResumeFiber(fiber) => {
5163                fibers.push(fiber);
5164            }
5165            WorkItem::PushFuture(future) => {
5166                self.futures
5167                    .get_mut()
5168                    .as_mut()
5169                    .unwrap()
5170                    .push(future.into_inner());
5171            }
5172            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5173            }
5174        };
5175
5176        for item in mem::take(&mut self.high_priority) {
5177            handle_item(item);
5178        }
5179        for item in mem::take(&mut self.low_priority) {
5180            handle_item(item);
5181        }
5182
5183        if let Some(them) = self.futures.get_mut().take() {
5184            futures.push(them);
5185        }
5186    }
5187
5188    fn push<V: Send + Sync + 'static>(
5189        &mut self,
5190        value: V,
5191    ) -> Result<TableId<V>, ResourceTableError> {
5192        self.table.get_mut().push(value).map(TableId::from)
5193    }
5194
5195    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5196        self.table.get_mut().get_mut(&Resource::from(id))
5197    }
5198
5199    pub fn add_child<T: 'static, U: 'static>(
5200        &mut self,
5201        child: TableId<T>,
5202        parent: TableId<U>,
5203    ) -> Result<(), ResourceTableError> {
5204        self.table
5205            .get_mut()
5206            .add_child(Resource::from(child), Resource::from(parent))
5207    }
5208
5209    pub fn remove_child<T: 'static, U: 'static>(
5210        &mut self,
5211        child: TableId<T>,
5212        parent: TableId<U>,
5213    ) -> Result<(), ResourceTableError> {
5214        self.table
5215            .get_mut()
5216            .remove_child(Resource::from(child), Resource::from(parent))
5217    }
5218
5219    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5220        self.table.get_mut().delete(Resource::from(id))
5221    }
5222
5223    fn push_future(&mut self, future: HostTaskFuture) {
5224        // Note that we can't directly push to `ConcurrentState::futures` here
5225        // since this may be called from a future that's being polled inside
5226        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5227        // so it has exclusive access while polling it.  Therefore, we push a
5228        // work item to the "high priority" queue, which will actually push to
5229        // `ConcurrentState::futures` later.
5230        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5231    }
5232
5233    fn push_high_priority(&mut self, item: WorkItem) {
5234        log::trace!("push high priority: {item:?}");
5235        self.high_priority.push(item);
5236    }
5237
5238    fn push_low_priority(&mut self, item: WorkItem) {
5239        log::trace!("push low priority: {item:?}");
5240        self.low_priority.push_front(item);
5241    }
5242
5243    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5244        if high_priority {
5245            self.push_high_priority(item);
5246        } else {
5247            self.push_low_priority(item);
5248        }
5249    }
5250
5251    fn promote_instance_local_thread_work_item(
5252        &mut self,
5253        current_instance: RuntimeComponentInstanceIndex,
5254    ) -> bool {
5255        self.promote_work_items_matching(|item: &WorkItem| match item {
5256            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5257                *instance == current_instance
5258            }
5259            _ => false,
5260        })
5261    }
5262
5263    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5264        self.promote_work_items_matching(|item: &WorkItem| match item {
5265            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5266                *t == thread
5267            }
5268            _ => false,
5269        })
5270    }
5271
5272    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5273    where
5274        F: FnMut(&WorkItem) -> bool,
5275    {
5276        // If there's a high-priority work item to resume the current guest thread,
5277        // we don't need to promote anything, but we return true to indicate that
5278        // work is pending for the current instance.
5279        if self.high_priority.iter().any(&mut predicate) {
5280            true
5281        }
5282        // Otherwise, look for a low-priority work item that matches the current
5283        // instance and promote it to high-priority.
5284        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5285            let item = self.low_priority.remove(idx).unwrap();
5286            self.push_high_priority(item);
5287            true
5288        } else {
5289            false
5290        }
5291    }
5292
5293    /// Returns whether there's a pending cancellation on the current guest thread,
5294    /// consuming the event if so.
5295    fn take_pending_cancellation(&mut self) -> Result<bool> {
5296        let thread = self.current_guest_thread()?;
5297        if let Some(event) = self.get_mut(thread.task)?.event.take() {
5298            assert!(matches!(event, Event::Cancelled));
5299            Ok(true)
5300        } else {
5301            Ok(false)
5302        }
5303    }
5304
5305    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5306        if self.may_block(task)? {
5307            Ok(())
5308        } else {
5309            Err(Trap::CannotBlockSyncTask.into())
5310        }
5311    }
5312
5313    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5314        let task = self.get_mut(task)?;
5315        Ok(task.async_function || task.returned_or_cancelled())
5316    }
5317
5318    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5319    /// specified task.
5320    ///
5321    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5322    /// below.
5323    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5324        let (task, is_host) = (task >> 1, task & 1 == 1);
5325        if is_host {
5326            let task: TableId<HostTask> = TableId::new(task);
5327            Ok(&mut self.get_mut(task)?.call_context)
5328        } else {
5329            let task: TableId<GuestTask> = TableId::new(task);
5330            Ok(&mut self.get_mut(task)?.call_context)
5331        }
5332    }
5333
5334    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5335    /// in the future.
5336    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5337        let (bits, is_host) = match self.current_thread {
5338            CurrentThread::Guest(id) => (id.task.rep(), false),
5339            CurrentThread::Host(id) => (id.rep(), true),
5340            CurrentThread::None => bail_bug!("current thread is not set"),
5341        };
5342        assert_eq!((bits << 1) >> 1, bits);
5343        Ok((bits << 1) | u32::from(is_host))
5344    }
5345
5346    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5347        match self.current_thread.guest() {
5348            Some(id) => Ok(*id),
5349            None => bail_bug!("current thread is not a guest thread"),
5350        }
5351    }
5352
5353    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5354        match self.current_thread.host() {
5355            Some(id) => Ok(id),
5356            None => bail_bug!("current thread is not a host thread"),
5357        }
5358    }
5359
5360    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5361        match self.futures.get_mut().as_mut() {
5362            Some(f) => Ok(f),
5363            None => bail_bug!("futures field of concurrent state is currently taken"),
5364        }
5365    }
5366
5367    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5368        self.table.get_mut()
5369    }
5370
5371    /// Returns the parent thread, if any, of `cur`.
5372    fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5373        match cur {
5374            CurrentThread::Guest(thread) => {
5375                let task = self.get_mut(thread.task).ok()?;
5376                Some(match task.caller {
5377                    Caller::Host { caller, .. } => caller,
5378                    Caller::Guest { thread } => thread.into(),
5379                })
5380            }
5381            CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5382            CurrentThread::None => None,
5383        }
5384    }
5385}
5386
5387/// Provide a type hint to compiler about the shape of a parameter lower
5388/// closure.
5389fn for_any_lower<
5390    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5391>(
5392    fun: F,
5393) -> F {
5394    fun
5395}
5396
5397/// Provide a type hint to compiler about the shape of a result lift closure.
5398fn for_any_lift<
5399    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5400>(
5401    fun: F,
5402) -> F {
5403    fun
5404}
5405
5406fn check_ambient_store(id: StoreId) {
5407    let message = "\
5408        `Future`s which depend on asynchronous component tasks, streams, or \
5409        futures to complete may only be polled from the event loop of the \
5410        store to which they belong.  Please use \
5411        `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5412    ";
5413    tls::try_get(|store| {
5414        let matched = match store {
5415            tls::TryGet::Some(store) => store.id() == id,
5416            tls::TryGet::Taken | tls::TryGet::None => false,
5417        };
5418
5419        if !matched {
5420            panic!("{message}")
5421        }
5422    });
5423}
5424
5425/// Assert that `StoreContextMut::run_concurrent` has not been called from
5426/// within an store's event loop.
5427fn check_recursive_run() {
5428    tls::try_get(|store| {
5429        if !matches!(store, tls::TryGet::None) {
5430            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5431        }
5432    });
5433}
5434
5435fn unpack_callback_code(code: u32) -> (u32, u32) {
5436    (code & 0xF, code >> 4)
5437}
5438
5439/// Helper struct for packaging parameters to be passed to
5440/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5441/// `waitable-set.poll`.
5442struct WaitableCheckParams {
5443    set: TableId<WaitableSet>,
5444    options: OptionsIndex,
5445    payload: u32,
5446}
5447
5448/// Indicates whether `ComponentInstance::waitable_check` is being called for
5449/// `waitable-set.wait` or `waitable-set.poll`.
5450enum WaitableCheck {
5451    Wait,
5452    Poll,
5453}
5454
5455/// An identifier representing a guest task within a component.
5456///
5457/// This can be acquired by calling [`Func::start_call_concurrent`] or
5458/// [`TypedFunc::start_call_concurrent`] and then using the
5459/// [`FuncCallConcurrent::task`] accessor, for example. This can then be
5460/// reflected on with [`StoreContextMut::async_call_stack`].
5461///
5462/// [`TypedFunc::start_call_concurrent`]: crate::component::TypedFunc::start_call_concurrent
5463#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5464pub struct GuestTaskId(TableId<GuestTask>);
5465
5466/// Represents a guest task called from the host, prepared using `prepare_call`.
5467pub(crate) struct PreparedCall<R> {
5468    /// The guest export to be called
5469    handle: Func,
5470    /// The guest thread created by `prepare_call`
5471    thread: QualifiedThreadId,
5472    /// The number of lowered core Wasm parameters to pass to the call.
5473    param_count: usize,
5474    /// The `oneshot::Receiver` to which the result of the call will be
5475    /// delivered when it is available.
5476    rx: oneshot::Receiver<LiftedResult>,
5477    /// The instance that this call is prepared for.
5478    runtime_instance: RuntimeInstance,
5479    _phantom: PhantomData<R>,
5480}
5481
5482impl<R> PreparedCall<R> {
5483    /// Get a copy of the `TaskId` for this `PreparedCall`.
5484    pub(crate) fn task_id(&self) -> TaskId {
5485        TaskId {
5486            task: self.thread.task,
5487            runtime_instance: self.runtime_instance,
5488        }
5489    }
5490}
5491
5492/// Represents a task created by `prepare_call`.
5493pub(crate) struct TaskId {
5494    task: TableId<GuestTask>,
5495    runtime_instance: RuntimeInstance,
5496}
5497
5498impl TaskId {
5499    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5500    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5501    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5502    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5503    /// and delete the task when all threads are done.
5504    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5505        let task = store.concurrent_state_mut().get_mut(self.task)?;
5506        let delete = if !task.already_lowered_parameters() {
5507            store.cancel_guest_subtask_without_lowered_parameters(
5508                self.runtime_instance,
5509                self.task,
5510            )?;
5511            true
5512        } else {
5513            task.host_future_state = HostFutureState::Dropped;
5514            task.ready_to_delete()
5515        };
5516        if delete {
5517            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5518        }
5519        Ok(())
5520    }
5521}
5522
5523/// Prepare a call to the specified exported Wasm function, providing functions
5524/// for lowering the parameters and lifting the result.
5525///
5526/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5527/// loop, use `queue_call`.
5528pub(crate) fn prepare_call<T, R>(
5529    mut store: StoreContextMut<T>,
5530    handle: Func,
5531    param_count: usize,
5532    host_future_present: bool,
5533    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5534    + Send
5535    + Sync
5536    + 'static,
5537    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5538    + Send
5539    + Sync
5540    + 'static,
5541) -> Result<PreparedCall<R>> {
5542    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5543
5544    let instance = handle.instance().id().get(store.0);
5545    let options = &instance.component().env_component().options[options];
5546    let ty = &instance.component().types()[ty];
5547    let async_function = ty.async_;
5548    let task_return_type = ty.results;
5549    let component_instance = raw_options.instance;
5550    let callback = options.callback.map(|i| instance.runtime_callback(i));
5551    let memory = options
5552        .memory()
5553        .map(|i| instance.runtime_memory(i))
5554        .map(SendSyncPtr::new);
5555    let string_encoding = options.string_encoding;
5556    let token = StoreToken::new(store.as_context_mut());
5557    let state = store.0.concurrent_state_mut();
5558
5559    let (tx, rx) = oneshot::channel();
5560
5561    let instance = handle.instance().runtime_instance(component_instance);
5562    let caller = state.current_thread;
5563    let thread = GuestTask::new(
5564        state,
5565        Box::new(for_any_lower(move |store, params| {
5566            lower_params(handle, token.as_context_mut(store), params)
5567        })),
5568        LiftResult {
5569            lift: Box::new(for_any_lift(move |store, result| {
5570                lift_result(handle, store, result)
5571            })),
5572            ty: task_return_type,
5573            memory,
5574            string_encoding,
5575        },
5576        Caller::Host {
5577            tx: Some(tx),
5578            host_future_present,
5579            caller,
5580        },
5581        callback.map(|callback| {
5582            let callback = SendSyncPtr::new(callback);
5583            let instance = handle.instance();
5584            Box::new(move |store: &mut dyn VMStore, event, handle| {
5585                let store = token.as_context_mut(store);
5586                // SAFETY: Per the contract of `prepare_call`, the callback
5587                // will remain valid at least as long is this task exists.
5588                unsafe { instance.call_callback(store, callback, event, handle) }
5589            }) as CallbackFn
5590        }),
5591        instance,
5592        async_function,
5593    )?;
5594
5595    if !store.0.may_enter(instance)? {
5596        bail!(Trap::CannotEnterComponent);
5597    }
5598
5599    Ok(PreparedCall {
5600        handle,
5601        thread,
5602        param_count,
5603        runtime_instance: instance,
5604        rx,
5605        _phantom: PhantomData,
5606    })
5607}
5608
5609pub(crate) struct QueuedCall<R> {
5610    store: StoreId,
5611    task: TableId<GuestTask>,
5612    rx: oneshot::Receiver<LiftedResult>,
5613    _marker: PhantomData<fn() -> R>,
5614}
5615
5616impl<R> QueuedCall<R> {
5617    /// Queue a call previously prepared using `prepare_call` to be run as part of
5618    /// the associated `ComponentInstance`'s event loop.
5619    ///
5620    /// The returned future will resolve to the result once it is available, but
5621    /// must only be polled via the instance's event loop. See
5622    /// `StoreContextMut::run_concurrent` for details.
5623    pub(crate) fn new<T: 'static>(
5624        mut store: StoreContextMut<T>,
5625        prepared: PreparedCall<R>,
5626    ) -> Result<QueuedCall<R>> {
5627        let PreparedCall {
5628            handle,
5629            thread,
5630            param_count,
5631            rx,
5632            ..
5633        } = prepared;
5634
5635        queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5636
5637        Ok(QueuedCall {
5638            store: store.0.id(),
5639            task: thread.task,
5640            rx,
5641            _marker: PhantomData,
5642        })
5643    }
5644
5645    fn task(&self) -> GuestTaskId {
5646        GuestTaskId(self.task)
5647    }
5648}
5649
5650impl<R> Future for QueuedCall<R>
5651where
5652    R: 'static,
5653{
5654    type Output = Result<R>;
5655
5656    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5657        check_ambient_store(self.store);
5658        Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5659            Ok(r) => match r.downcast() {
5660                Ok(r) => Ok(*r),
5661                Err(_) => bail_bug!("wrong type of value produced"),
5662            },
5663            Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5664        })
5665    }
5666}
5667
5668/// Queue a call previously prepared using `prepare_call` to be run as part of
5669/// the associated `ComponentInstance`'s event loop.
5670fn queue_call0<T: 'static>(
5671    store: StoreContextMut<T>,
5672    handle: Func,
5673    guest_thread: QualifiedThreadId,
5674    param_count: usize,
5675) -> Result<()> {
5676    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5677    let is_concurrent = raw_options.async_;
5678    let callback = raw_options.callback;
5679    let instance = handle.instance();
5680    let callee = handle.lifted_core_func(store.0);
5681    let post_return = handle.post_return_core_func(store.0);
5682    let callback = callback.map(|i| {
5683        let instance = instance.id().get(store.0);
5684        SendSyncPtr::new(instance.runtime_callback(i))
5685    });
5686
5687    log::trace!("queueing call {guest_thread:?}");
5688
5689    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5690    // (with signatures appropriate for this call) and will remain valid as
5691    // long as this instance is valid.
5692    unsafe {
5693        instance.queue_call(
5694            store,
5695            guest_thread,
5696            SendSyncPtr::new(callee),
5697            param_count,
5698            1,
5699            is_concurrent,
5700            callback,
5701            post_return.map(SendSyncPtr::new),
5702        )
5703    }
5704}