Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{self, Func, call_post_return};
56use crate::component::{
57    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61use crate::prelude::*;
62use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
63use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
64use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
65use crate::{
66    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
67};
68use alloc::borrow::ToOwned;
69use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
70use core::any::Any;
71use core::cell::UnsafeCell;
72use core::fmt;
73use core::future::Future;
74use core::marker::PhantomData;
75use core::mem::{self, ManuallyDrop, MaybeUninit};
76use core::ops::DerefMut;
77use core::pin::{Pin, pin};
78use core::ptr::{self, NonNull};
79use core::task::{Context, Poll, Waker};
80use futures::channel::oneshot;
81use futures::future::{self, FutureExt};
82use futures::stream::{FuturesUnordered, StreamExt};
83use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
87    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
89    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
90    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
91};
92use wasmtime_environ::packed_option::ReservedValue;
93use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
94
95pub use abort::JoinHandle;
96pub use future_stream_any::{FutureAny, StreamAny};
97pub use futures_and_streams::{
98    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
99    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
100    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
101};
102pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
103
104mod abort;
105mod error_contexts;
106mod future_stream_any;
107mod futures_and_streams;
108pub(crate) mod table;
109pub(crate) mod tls;
110
111/// Constant defined in the Component Model spec to indicate that the async
112/// intrinsic (e.g. `future.write`) has not yet completed.
113const BLOCKED: u32 = 0xffff_ffff;
114
115/// Corresponds to `CallState` in the upstream spec.
116#[derive(Clone, Copy, Eq, PartialEq, Debug)]
117pub enum Status {
118    Starting = 0,
119    Started = 1,
120    Returned = 2,
121    StartCancelled = 3,
122    ReturnCancelled = 4,
123}
124
125impl Status {
126    /// Packs this status and the optional `waitable` provided into a 32-bit
127    /// result that the canonical ABI requires.
128    ///
129    /// The low 4 bits are reserved for the status while the upper 28 bits are
130    /// the waitable, if present.
131    pub fn pack(self, waitable: Option<u32>) -> u32 {
132        assert!(matches!(self, Status::Returned) == waitable.is_none());
133        let waitable = waitable.unwrap_or(0);
134        assert!(waitable < (1 << 28));
135        (waitable << 4) | (self as u32)
136    }
137}
138
139/// Corresponds to `EventCode` in the Component Model spec, plus related payload
140/// data.
141#[derive(Clone, Copy, Debug)]
142enum Event {
143    None,
144    Subtask {
145        status: Status,
146    },
147    StreamRead {
148        code: ReturnCode,
149        pending: Option<(TypeStreamTableIndex, u32)>,
150    },
151    StreamWrite {
152        code: ReturnCode,
153        pending: Option<(TypeStreamTableIndex, u32)>,
154    },
155    FutureRead {
156        code: ReturnCode,
157        pending: Option<(TypeFutureTableIndex, u32)>,
158    },
159    FutureWrite {
160        code: ReturnCode,
161        pending: Option<(TypeFutureTableIndex, u32)>,
162    },
163    Cancelled,
164}
165
166impl Event {
167    /// Lower this event to core Wasm integers for delivery to the guest.
168    ///
169    /// Note that the waitable handle, if any, is assumed to be lowered
170    /// separately.
171    fn parts(self) -> (u32, u32) {
172        const EVENT_NONE: u32 = 0;
173        const EVENT_SUBTASK: u32 = 1;
174        const EVENT_STREAM_READ: u32 = 2;
175        const EVENT_STREAM_WRITE: u32 = 3;
176        const EVENT_FUTURE_READ: u32 = 4;
177        const EVENT_FUTURE_WRITE: u32 = 5;
178        const EVENT_CANCELLED: u32 = 6;
179        match self {
180            Event::None => (EVENT_NONE, 0),
181            Event::Cancelled => (EVENT_CANCELLED, 0),
182            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
183            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
184            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
185            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
186            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
187        }
188    }
189}
190
191/// Corresponds to `CallbackCode` in the spec.
192mod callback_code {
193    pub const EXIT: u32 = 0;
194    pub const YIELD: u32 = 1;
195    pub const WAIT: u32 = 2;
196}
197
198/// A flag indicating that the callee is an async-lowered export.
199///
200/// This may be passed to the `async-start` intrinsic from a fused adapter.
201const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
202
203/// Provides access to either store data (via the `get` method) or the store
204/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
205/// instance to which the current host task belongs.
206///
207/// See [`Accessor::with`] for details.
208pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
209    store: StoreContextMut<'a, T>,
210    get_data: fn(&mut T) -> D::Data<'_>,
211}
212
213impl<'a, T, D> Access<'a, T, D>
214where
215    D: HasData + ?Sized,
216    T: 'static,
217{
218    /// Creates a new [`Access`] from its component parts.
219    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
220        Self { store, get_data }
221    }
222
223    /// Get mutable access to the store data.
224    pub fn data_mut(&mut self) -> &mut T {
225        self.store.data_mut()
226    }
227
228    /// Get mutable access to the store data.
229    pub fn get(&mut self) -> D::Data<'_> {
230        (self.get_data)(self.data_mut())
231    }
232
233    /// Spawn a background task.
234    ///
235    /// See [`Accessor::spawn`] for details.
236    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
237    where
238        T: 'static,
239    {
240        let accessor = Accessor {
241            get_data: self.get_data,
242            token: StoreToken::new(self.store.as_context_mut()),
243        };
244        self.store
245            .as_context_mut()
246            .spawn_with_accessor(accessor, task)
247    }
248
249    /// Returns the getter this accessor is using to project from `T` into
250    /// `D::Data`.
251    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
252        self.get_data
253    }
254}
255
256impl<'a, T, D> AsContext for Access<'a, T, D>
257where
258    D: HasData + ?Sized,
259    T: 'static,
260{
261    type Data = T;
262
263    fn as_context(&self) -> StoreContext<'_, T> {
264        self.store.as_context()
265    }
266}
267
268impl<'a, T, D> AsContextMut for Access<'a, T, D>
269where
270    D: HasData + ?Sized,
271    T: 'static,
272{
273    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
274        self.store.as_context_mut()
275    }
276}
277
278/// Provides scoped mutable access to store data in the context of a concurrent
279/// host task future.
280///
281/// This allows multiple host task futures to execute concurrently and access
282/// the store between (but not across) `await` points.
283///
284/// # Rationale
285///
286/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
287/// `D::Data<'_>`. The problem this is solving, however, is that it does not
288/// literally store these values. The basic problem is that when a concurrent
289/// host future is being polled it has access to `&mut T` (and the whole
290/// `Store`) but when it's not being polled it does not have access to these
291/// values. This reflects how the store is only ever polling one future at a
292/// time so the store is effectively being passed between futures.
293///
294/// Rust's `Future` trait, however, has no means of passing a `Store`
295/// temporarily between futures. The [`Context`](core::task::Context) type does
296/// not have the ability to attach arbitrary information to it at this time.
297/// This type, [`Accessor`], is used to bridge this expressivity gap.
298///
299/// The [`Accessor`] type here represents the ability to acquire, temporarily in
300/// a synchronous manner, the current store. The [`Accessor::with`] function
301/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
302/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
303/// not take an `async` closure as its argument, instead it's a synchronous
304/// closure which must complete during on run of `Future::poll`. This reflects
305/// how the store is temporarily made available while a host future is being
306/// polled.
307///
308/// # Implementation
309///
310/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
311/// this type additionally doesn't even have a lifetime parameter. This is
312/// instead a representation of proof of the ability to acquire these while a
313/// future is being polled. Wasmtime will, when it polls a host future,
314/// configure ambient state such that the `Accessor` that a future closes over
315/// will work and be able to access the store.
316///
317/// This has a number of implications for users such as:
318///
319/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
320///   the lifetime of a single future.
321/// * A future is expected to, however, close over an `Accessor` and keep it
322///   alive probably for the duration of the entire future.
323/// * Different host futures will be given different `Accessor`s, and that's
324///   intentional.
325/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
326///   alleviates some otherwise required bounds to be written down.
327///
328/// # Using `Accessor` in `Drop`
329///
330/// The methods on `Accessor` are only expected to work in the context of
331/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
332/// host future can be dropped at any time throughout the system and Wasmtime
333/// store context is not necessarily available at that time. It's recommended to
334/// not use `Accessor` methods in anything connected to a `Drop` implementation
335/// as they will panic and have unintended results. If you run into this though
336/// feel free to file an issue on the Wasmtime repository.
337pub struct Accessor<T: 'static, D = HasSelf<T>>
338where
339    D: HasData + ?Sized,
340{
341    token: StoreToken<T>,
342    get_data: fn(&mut T) -> D::Data<'_>,
343}
344
345/// A helper trait to take any type of accessor-with-data in functions.
346///
347/// This trait is similar to [`AsContextMut`] except that it's used when
348/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
349/// [`Accessor`] is the main type used in concurrent settings and is passed to
350/// functions such as [`Func::call_concurrent`].
351///
352/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
353/// this trait. This effectively means that regardless of the `D` in
354/// `Accessor<T, D>` it can still be passed to a function which just needs a
355/// store accessor.
356///
357/// Acquiring an [`Accessor`] can be done through
358/// [`StoreContextMut::run_concurrent`] for example or in a host function
359/// through
360/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
361pub trait AsAccessor {
362    /// The `T` in `Store<T>` that this accessor refers to.
363    type Data: 'static;
364
365    /// The `D` in `Accessor<T, D>`, or the projection out of
366    /// `Self::Data`.
367    type AccessorData: HasData + ?Sized;
368
369    /// Returns the accessor that this is referring to.
370    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
371}
372
373impl<T: AsAccessor + ?Sized> AsAccessor for &T {
374    type Data = T::Data;
375    type AccessorData = T::AccessorData;
376
377    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
378        T::as_accessor(self)
379    }
380}
381
382impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
383    type Data = T;
384    type AccessorData = D;
385
386    fn as_accessor(&self) -> &Accessor<T, D> {
387        self
388    }
389}
390
391// Note that it is intentional at this time that `Accessor` does not actually
392// store `&mut T` or anything similar. This distinctly enables the `Accessor`
393// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
394// that matter). This is used to ergonomically simplify bindings where the
395// majority of the time `Accessor` is closed over in a future which then needs
396// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
397// you already have to write `T: 'static`...) it helps to avoid this.
398//
399// Note as well that `Accessor` doesn't actually store its data at all. Instead
400// it's more of a "proof" of what can be accessed from TLS. API design around
401// `Accessor` and functions like `Linker::func_wrap_concurrent` are
402// intentionally made to ensure that `Accessor` is ideally only used in the
403// context that TLS variables are actually set. For example host functions are
404// given `&Accessor`, not `Accessor`, and this prevents them from persisting
405// the value outside of a future. Within the future the TLS variables are all
406// guaranteed to be set while the future is being polled.
407//
408// Finally though this is not an ironclad guarantee, but nor does it need to be.
409// The TLS APIs are designed to panic or otherwise model usage where they're
410// called recursively or similar. It's hoped that code cannot be constructed to
411// actually hit this at runtime but this is not a safety requirement at this
412// time.
413const _: () = {
414    const fn assert<T: Send + Sync>() {}
415    assert::<Accessor<UnsafeCell<u32>>>();
416};
417
418impl<T> Accessor<T> {
419    /// Creates a new `Accessor` backed by the specified functions.
420    ///
421    /// - `get`: used to retrieve the store
422    ///
423    /// - `get_data`: used to "project" from the store's associated data to
424    /// another type (e.g. a field of that data or a wrapper around it).
425    ///
426    /// - `spawn`: used to queue spawned background tasks to be run later
427    pub(crate) fn new(token: StoreToken<T>) -> Self {
428        Self {
429            token,
430            get_data: |x| x,
431        }
432    }
433}
434
435impl<T, D> Accessor<T, D>
436where
437    D: HasData + ?Sized,
438{
439    /// Run the specified closure, passing it mutable access to the store.
440    ///
441    /// This function is one of the main building blocks of the [`Accessor`]
442    /// type. This yields synchronous, blocking, access to the store via an
443    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
444    /// providing the ability to access `D` via [`Access::get`]. Note that the
445    /// `fun` here is given only temporary access to the store and `T`/`D`
446    /// meaning that the return value `R` here is not allowed to capture borrows
447    /// into the two. If access is needed to data within `T` or `D` outside of
448    /// this closure then it must be `clone`d out, for example.
449    ///
450    /// # Panics
451    ///
452    /// This function will panic if it is call recursively with any other
453    /// accessor already in scope. For example if `with` is called within `fun`,
454    /// then this function will panic. It is up to the embedder to ensure that
455    /// this does not happen.
456    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
457        tls::get(|vmstore| {
458            fun(Access {
459                store: self.token.as_context_mut(vmstore),
460                get_data: self.get_data,
461            })
462        })
463    }
464
465    /// Returns the getter this accessor is using to project from `T` into
466    /// `D::Data`.
467    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
468        self.get_data
469    }
470
471    /// Changes this accessor to access `D2` instead of the current type
472    /// parameter `D`.
473    ///
474    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
475    ///
476    /// # Panics
477    ///
478    /// When using this API the returned value is disconnected from `&self` and
479    /// the lifetime binding the `self` argument. An `Accessor` only works
480    /// within the context of the closure or async closure that it was
481    /// originally given to, however. This means that due to the fact that the
482    /// returned value has no lifetime connection it's possible to use the
483    /// accessor outside of `&self`, the original accessor, and panic.
484    ///
485    /// The returned value should only be used within the scope of the original
486    /// `Accessor` that `self` refers to.
487    pub fn with_getter<D2: HasData>(
488        &self,
489        get_data: fn(&mut T) -> D2::Data<'_>,
490    ) -> Accessor<T, D2> {
491        Accessor {
492            token: self.token,
493            get_data,
494        }
495    }
496
497    /// Spawn a background task which will receive an `&Accessor<T, D>` and
498    /// run concurrently with any other tasks in progress for the current
499    /// store.
500    ///
501    /// This is particularly useful for host functions which return a `stream`
502    /// or `future` such that the code to write to the write end of that
503    /// `stream` or `future` must run after the function returns.
504    ///
505    /// The returned [`JoinHandle`] may be used to cancel the task.
506    ///
507    /// # Panics
508    ///
509    /// Panics if called within a closure provided to the [`Accessor::with`]
510    /// function. This can only be called outside an active invocation of
511    /// [`Accessor::with`].
512    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
513    where
514        T: 'static,
515    {
516        let accessor = self.clone_for_spawn();
517        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
518    }
519
520    fn clone_for_spawn(&self) -> Self {
521        Self {
522            token: self.token,
523            get_data: self.get_data,
524        }
525    }
526
527    /// Polls to see if this store contains any "interesting" tasks still within
528    /// it.
529    ///
530    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
531    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
532    /// the last remaining "interesting" task has exited the provided context's
533    /// waker will be notified. Note that only the waker passed to the last call
534    /// to `poll_no_interesting_tasks` will be notified, so this is only
535    /// appropriate to use one-at-a-time.
536    ///
537    /// The component model specification, as of this current date, does not
538    /// have a distinction between "interesting" tasks and not. The current
539    /// intention is that in a future revision of the component model this will
540    /// be distinguished at the component ABI level where tasks will be able to
541    /// flag themselves as "interesting" optionally. Additionally extra work can
542    /// be opted-in to being "interesting".
543    ///
544    /// For now what this means is that all component model tasks within this
545    /// store are considered interesting. This specifically includes the entire
546    /// duration of a task, so even all of the time after a task has returned
547    /// but before it has exited. This means that this function is, today,
548    /// effectively a proxy for "are there any more tasks still running in this
549    /// store". This can be used by embedders to determine whether there's any
550    /// more work going on, even in the background, for a particular guest.
551    /// Hosts can use this as a signal that the guest wants to stay alive a
552    /// little longer, even after a task has returned.
553    ///
554    /// In the future this predicate won't include all tasks in this store. Some
555    /// tasks will be able to flag themselves as not interesting, meaning that
556    /// when this returns ready it'd be possible that there are still tasks
557    /// remaining in the store.
558    ///
559    /// Note that at this time spawned threads within a task are always
560    /// considered uninteresting. If this function returns ready, then spawned
561    /// threads may still be in the store.
562    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
563        self.with(|mut access| {
564            let store = access.as_context_mut().0;
565            let state = store.concurrent_state_mut();
566            if state.interesting_tasks == 0 {
567                Poll::Ready(())
568            } else {
569                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
570                Poll::Pending
571            }
572        })
573    }
574}
575
576/// Represents a task which may be provided to `Accessor::spawn`,
577/// `Accessor::forward`, or `StorecContextMut::spawn`.
578// TODO: Replace this with `core::ops::AsyncFnOnce` when that becomes a viable
579// option.
580//
581// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
582// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
583// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
584// Send + Sync + 'static` fails with a type mismatch error when we try to pass
585// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
586// best we can do for the time being.
587pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
588where
589    D: HasData + ?Sized,
590{
591    /// Run the task.
592    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
593}
594
595/// Represents parameter and result metadata for the caller side of a
596/// guest->guest call orchestrated by a fused adapter.
597enum CallerInfo {
598    /// Metadata for a call to an async-lowered import
599    Async {
600        params: Vec<ValRaw>,
601        has_result: bool,
602    },
603    /// Metadata for a call to an sync-lowered import
604    Sync {
605        params: Vec<ValRaw>,
606        result_count: u32,
607    },
608}
609
610/// Indicates how a guest task is waiting on a waitable set.
611enum WaitMode {
612    /// The guest task is waiting using `task.wait`
613    Fiber(StoreFiber<'static>),
614    /// The guest task is waiting via a callback declared as part of an
615    /// async-lifted export.
616    Callback(Instance),
617}
618
619/// Represents the reason a fiber is suspending itself.
620#[derive(Debug)]
621enum SuspendReason {
622    /// The fiber is waiting for an event to be delivered to the specified
623    /// waitable set or task.
624    Waiting {
625        set: TableId<WaitableSet>,
626        thread: QualifiedThreadId,
627        skip_may_block_check: bool,
628    },
629    /// The fiber has finished handling its most recent work item and is waiting
630    /// for another (or to be dropped if it is no longer needed).
631    NeedWork,
632    /// The fiber is yielding and should be resumed once other tasks have had a
633    /// chance to run.
634    Yielding {
635        thread: QualifiedThreadId,
636        cancellable: bool,
637        skip_may_block_check: bool,
638    },
639    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
640    ExplicitlySuspending {
641        thread: QualifiedThreadId,
642        skip_may_block_check: bool,
643    },
644}
645
646/// Represents a pending call into guest code for a given guest task.
647enum GuestCallKind {
648    /// Indicates there's an event to deliver to the task, possibly related to a
649    /// waitable set the task has been waiting on or polling.
650    DeliverEvent {
651        /// The instance to which the task belongs.
652        instance: Instance,
653        /// The waitable set the event belongs to, if any.
654        ///
655        /// If this is `None` the event will be waiting in the
656        /// `GuestTask::event` field for the task.
657        set: Option<TableId<WaitableSet>>,
658    },
659    /// Indicates that a new guest task call is pending and may be executed
660    /// using the specified closure.
661    ///
662    /// If the closure returns `Ok(Some(call))`, the `call` should be run
663    /// immediately using `handle_guest_call`.
664    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
665    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
666}
667
668impl fmt::Debug for GuestCallKind {
669    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
670        match self {
671            Self::DeliverEvent { instance, set } => f
672                .debug_struct("DeliverEvent")
673                .field("instance", instance)
674                .field("set", set)
675                .finish(),
676            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
677            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
678        }
679    }
680}
681
682/// The target of a suspension intrinsic.
683#[derive(Copy, Clone, Debug)]
684pub enum SuspensionTarget {
685    SomeSuspended(u32),
686    Some(u32),
687    None,
688}
689
690impl SuspensionTarget {
691    fn is_none(&self) -> bool {
692        matches!(self, SuspensionTarget::None)
693    }
694    fn is_some(&self) -> bool {
695        !self.is_none()
696    }
697}
698
699/// Represents a pending call into guest code for a given guest thread.
700#[derive(Debug)]
701struct GuestCall {
702    thread: QualifiedThreadId,
703    kind: GuestCallKind,
704}
705
706impl GuestCall {
707    /// Returns whether or not the call is ready to run.
708    ///
709    /// A call will not be ready to run if either:
710    ///
711    /// - the (sub-)component instance to be called has already been entered and
712    /// cannot be reentered until an in-progress call completes
713    ///
714    /// - the call is for a not-yet started task and the (sub-)component
715    /// instance to be called has backpressure enabled
716    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
717        let instance = store
718            .concurrent_state_mut()
719            .get_mut(self.thread.task)?
720            .instance;
721        let state = store.instance_state(instance).concurrent_state();
722
723        let ready = match &self.kind {
724            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
725            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
726            GuestCallKind::StartExplicit(_) => true,
727        };
728        log::trace!(
729            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
730            state.do_not_enter,
731            state.backpressure
732        );
733        Ok(ready)
734    }
735}
736
737/// Job to be run on a worker fiber.
738enum WorkerItem {
739    GuestCall(GuestCall),
740    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
741}
742
743/// Represents a pending work item to be handled by the event loop for a given
744/// component instance.
745enum WorkItem {
746    /// A host task to be pushed to `ConcurrentState::futures`.
747    PushFuture(AlwaysMut<HostTaskFuture>),
748    /// A fiber to resume.
749    ResumeFiber(StoreFiber<'static>),
750    /// A thread to resume.
751    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
752    /// A pending call into guest code for a given guest task.
753    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
754    /// A job to run on a worker fiber.
755    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
756}
757
758impl fmt::Debug for WorkItem {
759    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760        match self {
761            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
762            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
763            Self::ResumeThread(instance, thread) => f
764                .debug_tuple("ResumeThread")
765                .field(instance)
766                .field(thread)
767                .finish(),
768            Self::GuestCall(instance, call) => f
769                .debug_tuple("GuestCall")
770                .field(instance)
771                .field(call)
772                .finish(),
773            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
774        }
775    }
776}
777
778/// Whether a suspension intrinsic was cancelled or completed
779#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
780pub(crate) enum WaitResult {
781    Cancelled,
782    Completed,
783}
784
785/// Poll the specified future until it completes on behalf of a guest->host call
786/// using a sync-lowered import.
787///
788/// This is similar to `Instance::first_poll` except it's for sync-lowered
789/// imports, meaning we don't need to handle cancellation and we can block the
790/// caller until the task completes, at which point the caller can handle
791/// lowering the result to the guest's stack and linear memory.
792pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
793    store: &mut dyn VMStore,
794    future: impl Future<Output = Result<R>> + Send + 'static,
795) -> Result<R> {
796    let state = store.concurrent_state_mut();
797    let task = state.current_host_thread()?;
798
799    // Wrap the future in a closure which will take care of stashing the result
800    // in `GuestTask::result` and resuming this fiber when the host task
801    // completes.
802    let mut future = Box::pin(async move {
803        let result = future.await?;
804        tls::get(move |store| {
805            let state = store.concurrent_state_mut();
806            let host_state = &mut state.get_mut(task)?.state;
807            assert!(matches!(host_state, HostTaskState::CalleeStarted));
808            *host_state = HostTaskState::CalleeFinished(Box::new(result));
809
810            Waitable::Host(task).set_event(
811                state,
812                Some(Event::Subtask {
813                    status: Status::Returned,
814                }),
815            )?;
816
817            Ok(())
818        })
819    }) as HostTaskFuture;
820
821    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
822    // add the future to `ConcurrentState::futures` and poll it automatically
823    // from the event loop if it doesn't complete immediately here.
824    let poll = tls::set(store, || {
825        future
826            .as_mut()
827            .poll(&mut Context::from_waker(&Waker::noop()))
828    });
829
830    match poll {
831        // It completed immediately; check the result and delete the task.
832        Poll::Ready(result) => result?,
833
834        // It did not complete immediately; add it to
835        // `ConcurrentState::futures` so it will be polled via the event loop;
836        // then use `GuestThread::sync_call_set` to wait for the task to
837        // complete, suspending the current fiber until it does so.
838        Poll::Pending => {
839            let state = store.concurrent_state_mut();
840            state.push_future(future);
841
842            let caller = state.get_mut(task)?.caller;
843            let set = state.get_mut(caller.thread)?.sync_call_set;
844            Waitable::Host(task).join(state, Some(set))?;
845
846            store.suspend(SuspendReason::Waiting {
847                set,
848                thread: caller,
849                skip_may_block_check: false,
850            })?;
851
852            // Remove the `task` from the `sync_call_set` to ensure that when
853            // this function returns and the task is deleted that there are no
854            // more lingering references to this host task.
855            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
856        }
857    }
858
859    // Retrieve and return the result.
860    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
861    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
862        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
863            Ok(result) => *result,
864            Err(_) => bail_bug!("host task finished with wrong type of result"),
865        }),
866        _ => bail_bug!("unexpected host task state after completion"),
867    }
868}
869
870/// Execute the specified guest call.
871fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
872    let mut next = Some(call);
873    while let Some(call) = next.take() {
874        match call.kind {
875            GuestCallKind::DeliverEvent { instance, set } => {
876                let (event, waitable) =
877                    match instance.get_event(store, call.thread.task, set, true)? {
878                        Some(pair) => pair,
879                        None => bail_bug!("delivering non-present event"),
880                    };
881                let state = store.concurrent_state_mut();
882                let task = state.get_mut(call.thread.task)?;
883                let runtime_instance = task.instance;
884                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
885
886                log::trace!(
887                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
888                    call.thread,
889                );
890
891                let old_thread = store.set_thread(call.thread)?;
892                log::trace!(
893                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
894                    call.thread
895                );
896
897                store.enter_instance(runtime_instance);
898
899                let Some(callback) = store
900                    .concurrent_state_mut()
901                    .get_mut(call.thread.task)?
902                    .callback
903                    .take()
904                else {
905                    bail_bug!("guest task callback field not present")
906                };
907
908                let code = callback(store, event, handle)?;
909
910                store
911                    .concurrent_state_mut()
912                    .get_mut(call.thread.task)?
913                    .callback = Some(callback);
914
915                store.exit_instance(runtime_instance)?;
916
917                store.set_thread(old_thread)?;
918
919                next = instance.handle_callback_code(
920                    store,
921                    call.thread,
922                    runtime_instance.index,
923                    code,
924                )?;
925
926                log::trace!(
927                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
928                );
929            }
930            GuestCallKind::StartImplicit(fun) => {
931                next = fun(store)?;
932            }
933            GuestCallKind::StartExplicit(fun) => {
934                fun(store)?;
935            }
936        }
937    }
938
939    Ok(())
940}
941
942impl<T> Store<T> {
943    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
944    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
945    where
946        T: Send + 'static,
947    {
948        ensure!(
949            self.as_context().0.concurrency_support(),
950            "cannot use `run_concurrent` when Config::concurrency_support disabled",
951        );
952        self.as_context_mut().run_concurrent(fun).await
953    }
954
955    #[doc(hidden)]
956    pub fn assert_concurrent_state_empty(&mut self) {
957        self.as_context_mut().assert_concurrent_state_empty();
958    }
959
960    #[doc(hidden)]
961    pub fn concurrent_state_table_size(&mut self) -> usize {
962        self.as_context_mut().concurrent_state_table_size()
963    }
964
965    /// Convenience wrapper for [`StoreContextMut::spawn`].
966    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
967    where
968        T: 'static,
969    {
970        self.as_context_mut().spawn(task)
971    }
972}
973
974impl<T> StoreContextMut<'_, T> {
975    /// Assert that all the relevant tables and queues in the concurrent state
976    /// for this store are empty.
977    ///
978    /// This is for sanity checking in integration tests
979    /// (e.g. `component-async-tests`) that the relevant state has been cleared
980    /// after each test concludes.  This should help us catch leaks, e.g. guest
981    /// tasks which haven't been deleted despite having completed and having
982    /// been dropped by their supertasks.
983    ///
984    /// Only intended for use in Wasmtime's own testing.
985    #[doc(hidden)]
986    pub fn assert_concurrent_state_empty(self) {
987        let store = self.0;
988        store
989            .store_data_mut()
990            .components
991            .assert_instance_states_empty();
992        let state = store.concurrent_state_mut();
993        assert!(
994            state.table.get_mut().is_empty(),
995            "non-empty table: {:?}",
996            state.table.get_mut()
997        );
998        assert!(state.high_priority.is_empty());
999        assert!(state.low_priority.is_empty());
1000        assert!(state.current_thread.is_none());
1001        assert!(state.futures_mut().unwrap().is_empty());
1002        assert!(state.global_error_context_ref_counts.is_empty());
1003    }
1004
1005    /// Helper function to perform tests over the size of the concurrent state
1006    /// table which can be useful for detecting leaks.
1007    ///
1008    /// Only intended for use in Wasmtime's own testing.
1009    #[doc(hidden)]
1010    pub fn concurrent_state_table_size(&mut self) -> usize {
1011        self.0
1012            .concurrent_state_mut()
1013            .table
1014            .get_mut()
1015            .iter_mut()
1016            .count()
1017    }
1018
1019    /// Spawn a background task to run as part of this instance's event loop.
1020    ///
1021    /// The task will receive an `&Accessor<U>` and run concurrently with
1022    /// any other tasks in progress for the instance.
1023    ///
1024    /// Note that the task will only make progress if and when the event loop
1025    /// for this instance is run.
1026    ///
1027    /// The returned [`JoinHandle`] may be used to cancel the task.
1028    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
1029    where
1030        T: 'static,
1031    {
1032        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1033        self.spawn_with_accessor(accessor, task)
1034    }
1035
1036    /// Internal implementation of `spawn` functions where a `store` is
1037    /// available along with an `Accessor`.
1038    fn spawn_with_accessor<D>(
1039        self,
1040        accessor: Accessor<T, D>,
1041        task: impl AccessorTask<T, D>,
1042    ) -> JoinHandle
1043    where
1044        T: 'static,
1045        D: HasData + ?Sized,
1046    {
1047        // Create an "abortable future" here where internally the future will
1048        // hook calls to poll and possibly spawn more background tasks on each
1049        // iteration.
1050        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1051        self.0
1052            .concurrent_state_mut()
1053            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1054        handle
1055    }
1056
1057    /// Run the specified closure `fun` to completion as part of this store's
1058    /// event loop.
1059    ///
1060    /// This will run `fun` as part of this store's event loop until it
1061    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1062    /// controlled access to the store and its data.
1063    ///
1064    /// This function can be used to invoke [`Func::call_concurrent`] for
1065    /// example within the async closure provided here.
1066    ///
1067    /// This function will unconditionally return an error if
1068    /// [`Config::concurrency_support`] is disabled.
1069    ///
1070    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1071    ///
1072    /// # Store-blocking behavior
1073    ///
1074    /// At this time there are certain situations in which the `Future` returned
1075    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1076    /// extended period of time, despite one or more `Waker::wake` events having
1077    /// occurred for the task to which it belongs.  This can manifest as the
1078    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1079    /// the `Store` being held by e.g. a blocking host function, preventing the
1080    /// `Future` from being polled. A canonical example of this is when the
1081    /// `fun` provided to this function attempts to set a timeout for an
1082    /// invocation of a wasm function. In this situation the async closure is
1083    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1084    /// to elapse. At this time this setup will not always work and the timeout
1085    /// may not reliably fire.
1086    ///
1087    /// This function will not block the current thread and as such is always
1088    /// suitable to run in an `async` context, but the current implementation of
1089    /// Wasmtime can lead to situations where a certain wasm computation is
1090    /// required to make progress the closure to make progress. This is an
1091    /// artifact of Wasmtime's historical implementation of `async` functions
1092    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1093    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1094    /// progress for a readiness notification of (b) to get delivered.
1095    ///
1096    /// This effectively means that it's not possible to reliably perform a
1097    /// "select" operation within the `fun` closure, which timeouts for example
1098    /// are based on. Fixing this requires some relatively major refactoring
1099    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1100    /// that is intended to be fixed one day. In the meantime it's recommended
1101    /// to apply timeouts or such to the entire `run_concurrent` call itself
1102    /// rather than internally.
1103    ///
1104    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1105    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1106    ///
1107    /// # Example
1108    ///
1109    /// ```
1110    /// # use {
1111    /// #   wasmtime::{
1112    /// #     error::{Result},
1113    /// #     component::{ Component, Linker, Resource, ResourceTable},
1114    /// #     Config, Engine, Store
1115    /// #   },
1116    /// # };
1117    /// #
1118    /// # struct MyResource(u32);
1119    /// # struct Ctx { table: ResourceTable }
1120    /// #
1121    /// # async fn foo() -> Result<()> {
1122    /// # let mut config = Config::new();
1123    /// # let engine = Engine::new(&config)?;
1124    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1125    /// # let mut linker = Linker::new(&engine);
1126    /// # let component = Component::new(&engine, "")?;
1127    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1128    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1129    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1130    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1131    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1132    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1133    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1134    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1135    ///    Ok(())
1136    /// }).await??;
1137    /// # Ok(())
1138    /// # }
1139    /// ```
1140    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1141    where
1142        T: Send + 'static,
1143    {
1144        ensure!(
1145            self.0.concurrency_support(),
1146            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1147        );
1148        self.do_run_concurrent(fun, false).await
1149    }
1150
1151    pub(super) async fn run_concurrent_trap_on_idle<R>(
1152        self,
1153        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1154    ) -> Result<R>
1155    where
1156        T: Send + 'static,
1157    {
1158        self.do_run_concurrent(fun, true).await
1159    }
1160
1161    async fn do_run_concurrent<R>(
1162        mut self,
1163        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1164        trap_on_idle: bool,
1165    ) -> Result<R>
1166    where
1167        T: Send + 'static,
1168    {
1169        debug_assert!(self.0.concurrency_support());
1170        check_recursive_run();
1171        let token = StoreToken::new(self.as_context_mut());
1172
1173        struct Dropper<'a, T: 'static, V> {
1174            store: StoreContextMut<'a, T>,
1175            value: ManuallyDrop<V>,
1176        }
1177
1178        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1179            fn drop(&mut self) {
1180                tls::set(self.store.0, || {
1181                    // SAFETY: Here we drop the value without moving it for the
1182                    // first and only time -- per the contract for `Drop::drop`,
1183                    // this code won't run again, and the `value` field will no
1184                    // longer be accessible.
1185                    unsafe { ManuallyDrop::drop(&mut self.value) }
1186                });
1187            }
1188        }
1189
1190        let accessor = &Accessor::new(token);
1191        let dropper = &mut Dropper {
1192            store: self,
1193            value: ManuallyDrop::new(fun(accessor)),
1194        };
1195        // SAFETY: We never move `dropper` nor its `value` field.
1196        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1197
1198        dropper
1199            .store
1200            .as_context_mut()
1201            .poll_until(future, trap_on_idle)
1202            .await
1203    }
1204
1205    /// Run this store's event loop.
1206    ///
1207    /// The returned future will resolve when the specified future completes or,
1208    /// if `trap_on_idle` is true, when the event loop can't make further
1209    /// progress.
1210    async fn poll_until<R>(
1211        mut self,
1212        mut future: Pin<&mut impl Future<Output = R>>,
1213        trap_on_idle: bool,
1214    ) -> Result<R>
1215    where
1216        T: Send + 'static,
1217    {
1218        struct Reset<'a, T: 'static> {
1219            store: StoreContextMut<'a, T>,
1220            futures: Option<FuturesUnordered<HostTaskFuture>>,
1221        }
1222
1223        impl<'a, T> Drop for Reset<'a, T> {
1224            fn drop(&mut self) {
1225                if let Some(futures) = self.futures.take() {
1226                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1227                }
1228            }
1229        }
1230
1231        loop {
1232            // Take `ConcurrentState::futures` out of the store so we can poll
1233            // it while also safely giving any of the futures inside access to
1234            // `self`.
1235            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1236            let mut reset = Reset {
1237                store: self.as_context_mut(),
1238                futures,
1239            };
1240            let mut next = match reset.futures.as_mut() {
1241                Some(f) => pin!(f.next()),
1242                None => bail_bug!("concurrent state missing futures field"),
1243            };
1244
1245            enum PollResult<R> {
1246                Complete(R),
1247                ProcessWork {
1248                    ready: Vec<WorkItem>,
1249                    low_priority: bool,
1250                },
1251            }
1252
1253            let result = future::poll_fn(|cx| {
1254                // First, poll the future we were passed as an argument and
1255                // return immediately if it's ready.
1256                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1257                    return Poll::Ready(Ok(PollResult::Complete(value)));
1258                }
1259
1260                // Next, poll `ConcurrentState::futures` (which includes any
1261                // pending host tasks and/or background tasks), returning
1262                // immediately if one of them fails.
1263                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1264                    Poll::Ready(Some(output)) => {
1265                        match output {
1266                            Err(e) => return Poll::Ready(Err(e)),
1267                            Ok(()) => {}
1268                        }
1269                        Poll::Ready(true)
1270                    }
1271                    Poll::Ready(None) => Poll::Ready(false),
1272                    Poll::Pending => Poll::Pending,
1273                };
1274
1275                // Next, collect the next batch of work items to process, if
1276                // any.  This will be either all of the high-priority work
1277                // items, or if there are none, a single low-priority work item.
1278                let state = reset.store.0.concurrent_state_mut();
1279                let mut ready = mem::take(&mut state.high_priority);
1280                let mut low_priority = false;
1281                if ready.is_empty() {
1282                    if let Some(item) = state.low_priority.pop_back() {
1283                        ready.push(item);
1284                        low_priority = true;
1285                    }
1286                }
1287                if !ready.is_empty() {
1288                    return Poll::Ready(Ok(PollResult::ProcessWork {
1289                        ready,
1290                        low_priority,
1291                    }));
1292                }
1293
1294                // Finally, if we have nothing else to do right now, determine what to do
1295                // based on whether there are any pending futures in
1296                // `ConcurrentState::futures`.
1297                return match next {
1298                    Poll::Ready(true) => {
1299                        // In this case, one of the futures in
1300                        // `ConcurrentState::futures` completed
1301                        // successfully, so we return now and continue
1302                        // the outer loop in case there is another one
1303                        // ready to complete.
1304                        Poll::Ready(Ok(PollResult::ProcessWork {
1305                            ready: Vec::new(),
1306                            low_priority: false,
1307                        }))
1308                    }
1309                    Poll::Ready(false) => {
1310                        // Poll the future we were passed one last time
1311                        // in case one of `ConcurrentState::futures` had
1312                        // the side effect of unblocking it.
1313                        if let Poll::Ready(value) =
1314                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1315                        {
1316                            Poll::Ready(Ok(PollResult::Complete(value)))
1317                        } else {
1318                            // In this case, there are no more pending
1319                            // futures in `ConcurrentState::futures`,
1320                            // there are no remaining work items, _and_
1321                            // the future we were passed as an argument
1322                            // still hasn't completed.
1323                            if trap_on_idle {
1324                                // `trap_on_idle` is true, so we exit
1325                                // immediately.
1326                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1327                            } else {
1328                                // `trap_on_idle` is false, so we assume
1329                                // that future will wake up and give us
1330                                // more work to do when it's ready to.
1331                                Poll::Pending
1332                            }
1333                        }
1334                    }
1335                    // There is at least one pending future in
1336                    // `ConcurrentState::futures` and we have nothing
1337                    // else to do but wait for now, so we return
1338                    // `Pending`.
1339                    Poll::Pending => Poll::Pending,
1340                };
1341            })
1342            .await;
1343
1344            // Put the `ConcurrentState::futures` back into the store before we
1345            // return or handle any work items since one or more of those items
1346            // might append more futures.
1347            drop(reset);
1348
1349            match result? {
1350                // The future we were passed as an argument completed, so we
1351                // return the result.
1352                PollResult::Complete(value) => break Ok(value),
1353                // The future we were passed has not yet completed, so handle
1354                // any work items and then loop again.
1355                PollResult::ProcessWork {
1356                    ready,
1357                    low_priority,
1358                } => {
1359                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1360                        store: StoreContextMut<'a, T>,
1361                        ready: I,
1362                    }
1363
1364                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1365                        fn drop(&mut self) {
1366                            while let Some(item) = self.ready.next() {
1367                                match item {
1368                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1369                                    WorkItem::PushFuture(future) => {
1370                                        tls::set(self.store.0, move || drop(future))
1371                                    }
1372                                    _ => {}
1373                                }
1374                            }
1375                        }
1376                    }
1377
1378                    let mut dispose = Dispose {
1379                        store: self.as_context_mut(),
1380                        ready: ready.into_iter(),
1381                    };
1382
1383                    // If we're about to run a low-priority task, first yield to
1384                    // the executor.  This ensures that it won't be starved of
1385                    // the ability to e.g. update the readiness of sockets,
1386                    // etc. which the guest may be using `thread.yield` along
1387                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1388                    //
1389                    // This works for e.g. `thread.yield` and callbacks which
1390                    // return `CALLBACK_CODE_YIELD` because we queue a low
1391                    // priority item to resume the task (i.e. resume the thread
1392                    // or call the callback, respectively) just prior to
1393                    // suspending it.  Indeed, as of this writing those are the
1394                    // _only_ situations we queue low-priority tasks.
1395                    // Therefore, we interpret the guest's request to yield as
1396                    // meaning "yield to other guest tasks _and_/_or_ host
1397                    // operations such as updating socket readiness", the latter
1398                    // being the async runtime's responsibility.
1399                    //
1400                    // In the future, if this ends up causing measurable
1401                    // performance issues, this could be optimized such that we
1402                    // only yield periodically (e.g. for batches of low priority
1403                    // items) and not for each and every idividual item.
1404                    if low_priority {
1405                        dispose.store.0.yield_now().await
1406                    }
1407
1408                    while let Some(item) = dispose.ready.next() {
1409                        dispose
1410                            .store
1411                            .as_context_mut()
1412                            .handle_work_item(item)
1413                            .await?;
1414                    }
1415                }
1416            }
1417        }
1418    }
1419
1420    /// Handle the specified work item, possibly resuming a fiber if applicable.
1421    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1422    where
1423        T: Send,
1424    {
1425        log::trace!("handle work item {item:?}");
1426        match item {
1427            WorkItem::PushFuture(future) => {
1428                self.0
1429                    .concurrent_state_mut()
1430                    .futures_mut()?
1431                    .push(future.into_inner());
1432            }
1433            WorkItem::ResumeFiber(fiber) => {
1434                self.0.resume_fiber(fiber).await?;
1435            }
1436            WorkItem::ResumeThread(_, thread) => {
1437                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1438                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1439                    GuestThreadState::Running,
1440                ) {
1441                    self.0.resume_fiber(fiber).await?;
1442                } else {
1443                    bail_bug!("cannot resume non-pending thread {thread:?}");
1444                }
1445            }
1446            WorkItem::GuestCall(_, call) => {
1447                if call.is_ready(self.0)? {
1448                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1449                } else {
1450                    let state = self.0.concurrent_state_mut();
1451                    let task = state.get_mut(call.thread.task)?;
1452                    if !task.starting_sent {
1453                        task.starting_sent = true;
1454                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1455                            Waitable::Guest(call.thread.task).set_event(
1456                                state,
1457                                Some(Event::Subtask {
1458                                    status: Status::Starting,
1459                                }),
1460                            )?;
1461                        }
1462                    }
1463
1464                    let instance = state.get_mut(call.thread.task)?.instance;
1465                    self.0
1466                        .instance_state(instance)
1467                        .concurrent_state()
1468                        .pending
1469                        .insert(call.thread, call.kind);
1470                }
1471            }
1472            WorkItem::WorkerFunction(fun) => {
1473                self.run_on_worker(WorkerItem::Function(fun)).await?;
1474            }
1475        }
1476
1477        Ok(())
1478    }
1479
1480    /// Execute the specified guest call on a worker fiber.
1481    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1482    where
1483        T: Send,
1484    {
1485        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1486            fiber
1487        } else {
1488            fiber::make_fiber(self.0, move |store| {
1489                loop {
1490                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1491                        bail_bug!("worker_item not present when resuming fiber")
1492                    };
1493                    match item {
1494                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1495                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1496                    }
1497
1498                    store.suspend(SuspendReason::NeedWork)?;
1499                }
1500            })?
1501        };
1502
1503        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1504        assert!(worker_item.is_none());
1505        *worker_item = Some(item);
1506
1507        self.0.resume_fiber(worker).await
1508    }
1509
1510    /// Wrap the specified host function in a future which will call it, passing
1511    /// it an `&Accessor<T>`.
1512    ///
1513    /// See the `Accessor` documentation for details.
1514    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1515    where
1516        T: 'static,
1517        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1518            + Send
1519            + Sync
1520            + 'static,
1521        R: Send + Sync + 'static,
1522    {
1523        let token = StoreToken::new(self);
1524        async move {
1525            let mut accessor = Accessor::new(token);
1526            closure(&mut accessor).await
1527        }
1528    }
1529}
1530
1531impl StoreOpaque {
1532    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1533    /// guest-to-guest call or a sync host-to-guest call.
1534    ///
1535    /// This task will only be used for the purpose of handling calls to
1536    /// intrinsic functions; both parameter lowering and result lifting are
1537    /// assumed to be taken care of elsewhere.
1538    pub(crate) fn enter_guest_sync_call(
1539        &mut self,
1540        guest_caller: Option<RuntimeInstance>,
1541        callee_async: bool,
1542        callee: RuntimeInstance,
1543    ) -> Result<()> {
1544        log::trace!("enter sync call {callee:?}");
1545        if !self.concurrency_support() {
1546            return self.enter_call_not_concurrent();
1547        }
1548
1549        let state = self.concurrent_state_mut();
1550        let thread = state.current_thread;
1551        let instance = if let Some(thread) = thread.guest() {
1552            Some(state.get_mut(thread.task)?.instance)
1553        } else {
1554            None
1555        };
1556        if guest_caller.is_some() {
1557            debug_assert_eq!(instance, guest_caller);
1558        }
1559        let guest_thread = GuestTask::new(
1560            state,
1561            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1562            LiftResult {
1563                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1564                ty: TypeTupleIndex::reserved_value(),
1565                memory: None,
1566                string_encoding: StringEncoding::Utf8,
1567            },
1568            if let Some(thread) = thread.guest() {
1569                Caller::Guest { thread: *thread }
1570            } else {
1571                Caller::Host {
1572                    tx: None,
1573                    host_future_present: false,
1574                    caller: thread,
1575                }
1576            },
1577            None,
1578            callee,
1579            callee_async,
1580        )?;
1581
1582        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1583            guest_thread.thread,
1584            self,
1585            callee.index,
1586        )?;
1587        self.set_thread(guest_thread)?;
1588
1589        Ok(())
1590    }
1591
1592    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1593    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1594        if !self.concurrency_support() {
1595            return Ok(self.exit_call_not_concurrent());
1596        }
1597        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1598            Some(t) => *t,
1599            None => bail_bug!("expected task when exiting"),
1600        };
1601        let task = self.concurrent_state_mut().get_mut(thread.task)?;
1602        let instance = task.instance;
1603        let caller = match &task.caller {
1604            &Caller::Guest { thread } => thread.into(),
1605            &Caller::Host { caller, .. } => caller,
1606        };
1607        task.lift_result = None;
1608        task.exited = true;
1609        self.set_thread(caller)?;
1610
1611        log::trace!("exit sync call {instance:?}");
1612        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1613
1614        Ok(())
1615    }
1616
1617    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1618    /// transition to the host.
1619    ///
1620    /// FIXME: this is called for all guest->host transitions and performs some
1621    /// relatively expensive table manipulations. This would ideally be
1622    /// optimized to avoid the full allocation of a `HostTask` in at least some
1623    /// situations.
1624    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1625        if !self.concurrency_support() {
1626            self.enter_call_not_concurrent()?;
1627            return Ok(None);
1628        }
1629        let state = self.concurrent_state_mut();
1630        let caller = state.current_guest_thread()?;
1631        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1632        log::trace!("new host task {task:?}");
1633        self.set_thread(task)?;
1634        Ok(Some(task))
1635    }
1636
1637    /// Invoked before lowering the results of a host task to the guest.
1638    ///
1639    /// This is used to update the current thread annotations within the store
1640    /// to ensure that it reflects the guest task, not the host task, since
1641    /// lowering may execute guest code.
1642    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1643        if !self.concurrency_support() {
1644            return Ok(());
1645        }
1646        let task = self.concurrent_state_mut().current_host_thread()?;
1647        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1648        self.set_thread(caller)?;
1649        Ok(())
1650    }
1651
1652    /// Dual of `host_task_create` and signifies that the host has finished and
1653    /// will be cleaned up.
1654    ///
1655    /// Note that this isn't invoked when the host is invoked asynchronously and
1656    /// the host isn't complete yet. In that situation the host task persists
1657    /// and will be cleaned up separately in `subtask_drop`
1658    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1659        match task {
1660            Some(task) => {
1661                log::trace!("delete host task {task:?}");
1662                self.concurrent_state_mut().delete(task)?;
1663            }
1664            None => {
1665                self.exit_call_not_concurrent();
1666            }
1667        }
1668        Ok(())
1669    }
1670
1671    /// Determine whether the specified instance may be entered from the host.
1672    ///
1673    /// We return `true` here only if all of the following hold:
1674    ///
1675    /// - The top-level instance is not already on the current task's call stack.
1676    /// - The instance is not in need of a post-return function call.
1677    /// - `self` has not been poisoned due to a trap.
1678    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1679        if self.trapped() {
1680            return Ok(false);
1681        }
1682        if !self.concurrency_support() {
1683            return Ok(true);
1684        }
1685        let state = self.concurrent_state_mut();
1686        let mut cur = state.current_thread;
1687        loop {
1688            match cur {
1689                CurrentThread::None => break Ok(true),
1690                CurrentThread::Guest(thread) => {
1691                    let task = state.get_mut(thread.task)?;
1692
1693                    // Note that we only compare top-level instance IDs here.
1694                    // The idea is that the host is not allowed to recursively
1695                    // enter a top-level instance even if the specific leaf
1696                    // instance is not on the stack. This the behavior defined
1697                    // in the spec, and it allows us to elide runtime checks in
1698                    // guest-to-guest adapters.
1699                    if task.instance.instance == instance.instance {
1700                        break Ok(false);
1701                    }
1702                    cur = match task.caller {
1703                        Caller::Host { caller, .. } => caller,
1704                        Caller::Guest { thread } => thread.into(),
1705                    };
1706                }
1707                CurrentThread::Host(id) => {
1708                    cur = state.get_mut(id)?.caller.into();
1709                }
1710            }
1711        }
1712    }
1713
1714    /// Helper function to retrieve the `InstanceState` for the
1715    /// specified instance.
1716    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1717        self.component_instance_mut(instance.instance)
1718            .instance_state(instance.index)
1719    }
1720
1721    /// Configure the currently running `thread`.
1722    ///
1723    /// This will save off any state necessary for the previous thread, if
1724    /// applicable, and then it'll additionally update state for `thread` if
1725    /// needed too.
1726    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1727        let thread = thread.into();
1728        let state = self.concurrent_state_mut();
1729        let old_thread = mem::replace(&mut state.current_thread, thread);
1730
1731        // First thing to do after swapping threads is updating the context
1732        // slots for this thread within the store. This restores the behavior of
1733        // `context.{get,set}`. This involves taking the old state out of the
1734        // store, saving it in the thread that's being swapped from, and doing
1735        // the inverse for the new thread. When debug assertions are enabled
1736        // this also leaves behind sentinel values to try to uncover bugs where
1737        // this may be forgotten.
1738        if let Some(old_thread) = old_thread.guest() {
1739            let old_context = self.vm_store_context().component_context;
1740            self.concurrent_state_mut()
1741                .get_mut(old_thread.thread)?
1742                .context = old_context;
1743        }
1744        if cfg!(debug_assertions) {
1745            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1746        }
1747        if let Some(thread) = thread.guest() {
1748            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1749            let context = thread.context;
1750            if cfg!(debug_assertions) {
1751                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1752            }
1753            self.vm_store_context_mut().component_context = context;
1754        }
1755
1756        // Each time we switch threads, we conservatively set `task_may_block`
1757        // to `false` for the component instance we're switching away from (if
1758        // any), meaning it will be `false` for any new thread created for that
1759        // instance unless explicitly set otherwise.
1760        //
1761        // Additionally if we're switching to a new thread, set its component
1762        // instance's `task_may_block` according to where it left off.
1763        let state = self.concurrent_state_mut();
1764        if let Some(old_thread) = old_thread.guest() {
1765            let instance = state.get_mut(old_thread.task)?.instance.instance;
1766            self.component_instance_mut(instance)
1767                .set_task_may_block(false)
1768        }
1769
1770        if thread.guest().is_some() {
1771            self.set_task_may_block()?;
1772        }
1773
1774        Ok(old_thread)
1775    }
1776
1777    /// Set the global variable representing whether the current task may block
1778    /// prior to entering Wasm code.
1779    fn set_task_may_block(&mut self) -> Result<()> {
1780        let state = self.concurrent_state_mut();
1781        let guest_thread = state.current_guest_thread()?;
1782        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1783        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1784        self.component_instance_mut(instance)
1785            .set_task_may_block(may_block);
1786        Ok(())
1787    }
1788
1789    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1790        if !self.concurrency_support() {
1791            return Ok(());
1792        }
1793        let state = self.concurrent_state_mut();
1794        let task = state.current_guest_thread()?.task;
1795        let instance = state.get_mut(task)?.instance.instance;
1796        let task_may_block = self.component_instance(instance).get_task_may_block();
1797
1798        if task_may_block {
1799            Ok(())
1800        } else {
1801            Err(Trap::CannotBlockSyncTask.into())
1802        }
1803    }
1804
1805    /// Record that we're about to enter a (sub-)component instance which does
1806    /// not support more than one concurrent, stackful activation, meaning it
1807    /// cannot be entered again until the next call returns.
1808    fn enter_instance(&mut self, instance: RuntimeInstance) {
1809        log::trace!("enter {instance:?}");
1810        self.instance_state(instance)
1811            .concurrent_state()
1812            .do_not_enter = true;
1813    }
1814
1815    /// Record that we've exited a (sub-)component instance previously entered
1816    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1817    /// See the documentation for the latter for details.
1818    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1819        log::trace!("exit {instance:?}");
1820        self.instance_state(instance)
1821            .concurrent_state()
1822            .do_not_enter = false;
1823        self.partition_pending(instance)
1824    }
1825
1826    /// Iterate over `InstanceState::pending`, moving any ready items into the
1827    /// "high priority" work item queue.
1828    ///
1829    /// See `GuestCall::is_ready` for details.
1830    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1831        for (thread, kind) in
1832            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1833        {
1834            let call = GuestCall { thread, kind };
1835            if call.is_ready(self)? {
1836                self.concurrent_state_mut()
1837                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1838            } else {
1839                self.instance_state(instance)
1840                    .concurrent_state()
1841                    .pending
1842                    .insert(call.thread, call.kind);
1843            }
1844        }
1845
1846        Ok(())
1847    }
1848
1849    /// Implements the `backpressure.{inc,dec}` intrinsics.
1850    pub(crate) fn backpressure_modify(
1851        &mut self,
1852        caller_instance: RuntimeInstance,
1853        modify: impl FnOnce(u16) -> Option<u16>,
1854    ) -> Result<()> {
1855        let state = self.instance_state(caller_instance).concurrent_state();
1856        let old = state.backpressure;
1857        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1858        state.backpressure = new;
1859
1860        if old > 0 && new == 0 {
1861            // Backpressure was previously enabled and is now disabled; move any
1862            // newly-eligible guest calls to the "high priority" queue.
1863            self.partition_pending(caller_instance)?;
1864        }
1865
1866        Ok(())
1867    }
1868
1869    /// Resume the specified fiber, giving it exclusive access to the specified
1870    /// store.
1871    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1872        let old_thread = self.concurrent_state_mut().current_thread;
1873        log::trace!("resume_fiber: save current thread {old_thread:?}");
1874
1875        let fiber = fiber::resolve_or_release(self, fiber).await?;
1876
1877        self.set_thread(old_thread)?;
1878
1879        let state = self.concurrent_state_mut();
1880
1881        if let Some(ot) = old_thread.guest() {
1882            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1883        }
1884        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1885
1886        if let Some(mut fiber) = fiber {
1887            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1888            // See the `SuspendReason` documentation for what each case means.
1889            let reason = match state.suspend_reason.take() {
1890                Some(r) => r,
1891                None => bail_bug!("suspend reason missing when resuming fiber"),
1892            };
1893            match reason {
1894                SuspendReason::NeedWork => {
1895                    if state.worker.is_none() {
1896                        state.worker = Some(fiber);
1897                    } else {
1898                        fiber.dispose(self);
1899                    }
1900                }
1901                SuspendReason::Yielding {
1902                    thread,
1903                    cancellable,
1904                    ..
1905                } => {
1906                    state.get_mut(thread.thread)?.state =
1907                        GuestThreadState::Ready { fiber, cancellable };
1908                    let instance = state.get_mut(thread.task)?.instance.index;
1909                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1910                }
1911                SuspendReason::ExplicitlySuspending { thread, .. } => {
1912                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1913                }
1914                SuspendReason::Waiting { set, thread, .. } => {
1915                    let old = state
1916                        .get_mut(set)?
1917                        .waiting
1918                        .insert(thread, WaitMode::Fiber(fiber));
1919                    assert!(old.is_none());
1920                }
1921            };
1922        } else {
1923            log::trace!("resume_fiber: fiber has exited");
1924        }
1925
1926        Ok(())
1927    }
1928
1929    /// Suspend the current fiber, storing the reason in
1930    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1931    /// it should be resumed.
1932    ///
1933    /// See the `SuspendReason` documentation for details.
1934    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1935        log::trace!("suspend fiber: {reason:?}");
1936
1937        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1938        // pop the call context which manages resource borrows before suspending
1939        // and then push it again once we've resumed.
1940        let task = match &reason {
1941            SuspendReason::Yielding { thread, .. }
1942            | SuspendReason::Waiting { thread, .. }
1943            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1944            SuspendReason::NeedWork => None,
1945        };
1946
1947        let old_guest_thread = if task.is_some() {
1948            self.concurrent_state_mut().current_thread
1949        } else {
1950            CurrentThread::None
1951        };
1952
1953        // We should not have reached here unless either there's no current
1954        // task, or the current task is permitted to block.  In addition, we
1955        // special-case `thread.switch-to` and waiting for a subtask to go from
1956        // `starting` to `started`, both of which we consider non-blocking
1957        // operations despite requiring a suspend.
1958        debug_assert!(
1959            matches!(
1960                reason,
1961                SuspendReason::ExplicitlySuspending {
1962                    skip_may_block_check: true,
1963                    ..
1964                } | SuspendReason::Waiting {
1965                    skip_may_block_check: true,
1966                    ..
1967                } | SuspendReason::Yielding {
1968                    skip_may_block_check: true,
1969                    ..
1970                }
1971            ) || old_guest_thread
1972                .guest()
1973                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1974                .transpose()?
1975                .unwrap_or(true)
1976        );
1977
1978        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1979        assert!(suspend_reason.is_none());
1980        *suspend_reason = Some(reason);
1981
1982        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1983
1984        if task.is_some() {
1985            self.set_thread(old_guest_thread)?;
1986        }
1987
1988        Ok(())
1989    }
1990
1991    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1992        let state = self.concurrent_state_mut();
1993
1994        if waitable.common(state)?.set.is_some() {
1995            bail!(Trap::WaitableSyncAndAsync);
1996        }
1997
1998        let caller = state.current_guest_thread()?;
1999        let set = state.get_mut(caller.thread)?.sync_call_set;
2000        waitable.join(state, Some(set))?;
2001        self.suspend(SuspendReason::Waiting {
2002            set,
2003            thread: caller,
2004            skip_may_block_check: false,
2005        })?;
2006        let state = self.concurrent_state_mut();
2007        waitable.join(state, None)
2008    }
2009
2010    /// Cleans up the data structures backing the `guest_thread` specified,
2011    /// removing it from the internal tables of `runtime_instance` as well.
2012    ///
2013    /// This function is used whenever a guest thread has fully exited and
2014    /// completed. This'll clean up the associated `GuestThread` structure and
2015    /// related resources it contains.
2016    ///
2017    /// Other functionality that this implements is:
2018    ///
2019    /// * This will perform conditional cleanup of the `GuestTask` that owns
2020    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2021    /// * If there are no more threads in the `GuestTask` that this thread is
2022    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2023    ///   returned or cancelled), then a trap will be raised that a result
2024    ///   wasn't ever produced.
2025    /// * If this task is finished, meaning the top-level thread exited and
2026    ///   additionally it's been returned or cancelled, then this will handle
2027    ///   management of the store's "active interesting tasks" counter.
2028    ///
2029    /// Effectively this is intended to be a "narrow waist" through which many
2030    /// destruction operations are funneled through.
2031    fn cleanup_thread(
2032        &mut self,
2033        guest_thread: QualifiedThreadId,
2034        runtime_instance: RuntimeInstance,
2035        cleanup_task: CleanupTask,
2036    ) -> Result<()> {
2037        let state = self.concurrent_state_mut();
2038        let thread_data = state.get_mut(guest_thread.thread)?;
2039        let sync_call_set = thread_data.sync_call_set;
2040        if let Some(guest_id) = thread_data.instance_rep {
2041            self.instance_state(runtime_instance)
2042                .thread_handle_table()
2043                .guest_thread_remove(guest_id)?;
2044        }
2045        let state = self.concurrent_state_mut();
2046
2047        // Clean up any pending subtasks in the sync_call_set
2048        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2049            if let Some(Event::Subtask {
2050                status: Status::Returned | Status::ReturnCancelled,
2051            }) = waitable.common(state)?.event
2052            {
2053                waitable.delete_from(state)?;
2054            }
2055        }
2056
2057        state.delete(guest_thread.thread)?;
2058        state.delete(sync_call_set)?;
2059        let task = state.get_mut(guest_thread.task)?;
2060        task.threads.remove(&guest_thread.thread);
2061
2062        if task.threads.is_empty() && !task.returned_or_cancelled() {
2063            bail!(Trap::NoAsyncResult);
2064        }
2065        let ready_to_delete = task.ready_to_delete();
2066
2067        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2068            task.decremented_interesting_task_count = true;
2069
2070            debug_assert!(state.interesting_tasks > 0);
2071            state.interesting_tasks -= 1;
2072            if state.interesting_tasks == 0
2073                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2074            {
2075                waker.wake();
2076            }
2077        }
2078
2079        match cleanup_task {
2080            CleanupTask::Yes => {
2081                if ready_to_delete {
2082                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2083                }
2084            }
2085            CleanupTask::No => {}
2086        }
2087
2088        Ok(())
2089    }
2090
2091    /// Performs cancellation of the `guest_task` specified with the
2092    /// precondition that the task hasn't lowered its parameters.
2093    ///
2094    /// In this situation the task hasn't ever been started meaning it hasn't
2095    /// actually run any wasm code yet. This requires cleaning up metadata such
2096    /// as thread information attached to the task.
2097    ///
2098    /// The main two entrypoints for this function are:
2099    ///
2100    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2101    /// * Dropping a host `call_async` future which needs to cancel the task
2102    ///   because it cannot reference its parameters any more.
2103    fn cancel_guest_subtask_without_lowered_parameters(
2104        &mut self,
2105        caller_instance: RuntimeInstance,
2106        guest_task: TableId<GuestTask>,
2107    ) -> Result<()> {
2108        let concurrent_state = self.concurrent_state_mut();
2109        let task = concurrent_state.get_mut(guest_task)?;
2110        assert!(!task.already_lowered_parameters());
2111        // The task is in a `starting` state, meaning it hasn't run at
2112        // all yet.  Here we update its fields to indicate that it is
2113        // ready to delete immediately once `subtask.drop` is called.
2114        task.lower_params = None;
2115        task.lift_result = None;
2116        task.exited = true;
2117        let instance = task.instance;
2118
2119        // Clean up the thread within this task as it's now never going
2120        // to run.
2121        assert_eq!(1, task.threads.len());
2122        let thread = *task.threads.iter().next().unwrap();
2123        self.cleanup_thread(
2124            QualifiedThreadId {
2125                task: guest_task,
2126                thread,
2127            },
2128            caller_instance,
2129            CleanupTask::No,
2130        )?;
2131
2132        // Not yet started; cancel and remove from pending
2133        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2134        let pending_count = pending.len();
2135        pending.retain(|thread, _| thread.task != guest_task);
2136        // If there were no pending threads for this task, we're in an error state
2137        if pending.len() == pending_count {
2138            bail!(Trap::SubtaskCancelAfterTerminal);
2139        }
2140        Ok(())
2141    }
2142}
2143
2144enum CleanupTask {
2145    Yes,
2146    No,
2147}
2148
2149impl Instance {
2150    /// Get the next pending event for the specified task and (optional)
2151    /// waitable set, along with the waitable handle if applicable.
2152    fn get_event(
2153        self,
2154        store: &mut StoreOpaque,
2155        guest_task: TableId<GuestTask>,
2156        set: Option<TableId<WaitableSet>>,
2157        cancellable: bool,
2158    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2159        let state = store.concurrent_state_mut();
2160
2161        let event = &mut state.get_mut(guest_task)?.event;
2162        if let Some(ev) = event
2163            && (cancellable || !matches!(ev, Event::Cancelled))
2164        {
2165            log::trace!("deliver event {ev:?} to {guest_task:?}");
2166            let ev = *ev;
2167            *event = None;
2168            return Ok(Some((ev, None)));
2169        }
2170
2171        let set = match set {
2172            Some(set) => set,
2173            None => return Ok(None),
2174        };
2175        let waitable = match state.get_mut(set)?.ready.pop_first() {
2176            Some(v) => v,
2177            None => return Ok(None),
2178        };
2179
2180        let common = waitable.common(state)?;
2181        let handle = match common.handle {
2182            Some(h) => h,
2183            None => bail_bug!("handle not set when delivering event"),
2184        };
2185        let event = match common.event.take() {
2186            Some(e) => e,
2187            None => bail_bug!("event not set when delivering event"),
2188        };
2189
2190        log::trace!(
2191            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2192        );
2193
2194        waitable.on_delivery(store, self, event)?;
2195
2196        Ok(Some((event, Some((waitable, handle)))))
2197    }
2198
2199    /// Handle the `CallbackCode` returned from an async-lifted export or its
2200    /// callback.
2201    ///
2202    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2203    /// using `handle_guest_call`.
2204    fn handle_callback_code(
2205        self,
2206        store: &mut StoreOpaque,
2207        guest_thread: QualifiedThreadId,
2208        runtime_instance: RuntimeComponentInstanceIndex,
2209        code: u32,
2210    ) -> Result<Option<GuestCall>> {
2211        let (code, set) = unpack_callback_code(code);
2212
2213        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2214
2215        let state = store.concurrent_state_mut();
2216
2217        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2218            let set = store
2219                .instance_state(self.runtime_instance(runtime_instance))
2220                .handle_table()
2221                .waitable_set_rep(handle)?;
2222
2223            Ok(TableId::<WaitableSet>::new(set))
2224        };
2225
2226        Ok(match code {
2227            callback_code::EXIT => {
2228                log::trace!("implicit thread {guest_thread:?} completed");
2229                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2230                task.exited = true;
2231                task.callback = None;
2232                store.cleanup_thread(
2233                    guest_thread,
2234                    self.runtime_instance(runtime_instance),
2235                    CleanupTask::Yes,
2236                )?;
2237                None
2238            }
2239            callback_code::YIELD => {
2240                let task = state.get_mut(guest_thread.task)?;
2241                // If an `Event::Cancelled` is pending, we'll deliver that;
2242                // otherwise, we'll deliver `Event::None`.  Note that
2243                // `GuestTask::event` is only ever set to one of those two
2244                // `Event` variants.
2245                if let Some(event) = task.event {
2246                    assert!(matches!(event, Event::None | Event::Cancelled));
2247                } else {
2248                    task.event = Some(Event::None);
2249                }
2250                let call = GuestCall {
2251                    thread: guest_thread,
2252                    kind: GuestCallKind::DeliverEvent {
2253                        instance: self,
2254                        set: None,
2255                    },
2256                };
2257                if state.may_block(guest_thread.task)? {
2258                    // Push this thread onto the "low priority" queue so it runs
2259                    // after any other threads have had a chance to run.
2260                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2261                    None
2262                } else {
2263                    // Yielding in a non-blocking context is defined as a no-op
2264                    // according to the spec, so we must run this thread
2265                    // immediately without allowing any others to run.
2266                    Some(call)
2267                }
2268            }
2269            callback_code::WAIT => {
2270                // The task may only return `WAIT` if it was created for a call
2271                // to an async export).  Otherwise, we'll trap.
2272                state.check_blocking_for(guest_thread.task)?;
2273
2274                let set = get_set(store, set)?;
2275                let state = store.concurrent_state_mut();
2276
2277                if state.get_mut(guest_thread.task)?.event.is_some()
2278                    || !state.get_mut(set)?.ready.is_empty()
2279                {
2280                    // An event is immediately available; deliver it ASAP.
2281                    state.push_high_priority(WorkItem::GuestCall(
2282                        runtime_instance,
2283                        GuestCall {
2284                            thread: guest_thread,
2285                            kind: GuestCallKind::DeliverEvent {
2286                                instance: self,
2287                                set: Some(set),
2288                            },
2289                        },
2290                    ));
2291                } else {
2292                    // No event is immediately available.
2293                    //
2294                    // We're waiting, so register to be woken up when an event
2295                    // is published for this waitable set.
2296                    //
2297                    // Here we also set `GuestTask::wake_on_cancel` which allows
2298                    // `subtask.cancel` to interrupt the wait.
2299                    let old = state
2300                        .get_mut(guest_thread.thread)?
2301                        .wake_on_cancel
2302                        .replace(set);
2303                    if !old.is_none() {
2304                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2305                    }
2306                    let old = state
2307                        .get_mut(set)?
2308                        .waiting
2309                        .insert(guest_thread, WaitMode::Callback(self));
2310                    if !old.is_none() {
2311                        bail_bug!("set's waiting set already had this thread registered");
2312                    }
2313                }
2314                None
2315            }
2316            _ => bail!(Trap::UnsupportedCallbackCode),
2317        })
2318    }
2319
2320    /// Add the specified guest call to the "high priority" work item queue, to
2321    /// be started as soon as backpressure and/or reentrance rules allow.
2322    ///
2323    /// SAFETY: The raw pointer arguments must be valid references to guest
2324    /// functions (with the appropriate signatures) when the closures queued by
2325    /// this function are called.
2326    unsafe fn queue_call<T: 'static>(
2327        self,
2328        mut store: StoreContextMut<T>,
2329        guest_thread: QualifiedThreadId,
2330        callee: SendSyncPtr<VMFuncRef>,
2331        param_count: usize,
2332        result_count: usize,
2333        async_: bool,
2334        callback: Option<SendSyncPtr<VMFuncRef>>,
2335        post_return: Option<SendSyncPtr<VMFuncRef>>,
2336    ) -> Result<()> {
2337        /// Return a closure which will call the specified function in the scope
2338        /// of the specified task.
2339        ///
2340        /// This will use `GuestTask::lower_params` to lower the parameters, but
2341        /// will not lift the result; instead, it returns a
2342        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2343        /// any, may be lifted.  Note that an async-lifted export will have
2344        /// returned its result using the `task.return` intrinsic (or not
2345        /// returned a result at all, in the case of `task.cancel`), in which
2346        /// case the "result" of this call will either be a callback code or
2347        /// nothing.
2348        ///
2349        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2350        /// the returned closure is called.
2351        unsafe fn make_call<T: 'static>(
2352            store: StoreContextMut<T>,
2353            guest_thread: QualifiedThreadId,
2354            callee: SendSyncPtr<VMFuncRef>,
2355            param_count: usize,
2356            result_count: usize,
2357        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2358        + Send
2359        + Sync
2360        + 'static
2361        + use<T> {
2362            let token = StoreToken::new(store);
2363            move |store: &mut dyn VMStore| {
2364                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2365
2366                store
2367                    .concurrent_state_mut()
2368                    .get_mut(guest_thread.thread)?
2369                    .state = GuestThreadState::Running;
2370                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2371                let lower = match task.lower_params.take() {
2372                    Some(l) => l,
2373                    None => bail_bug!("lower_params missing"),
2374                };
2375
2376                lower(store, &mut storage[..param_count])?;
2377
2378                let mut store = token.as_context_mut(store);
2379
2380                // SAFETY: Per the contract documented in `make_call's`
2381                // documentation, `callee` must be a valid pointer.
2382                unsafe {
2383                    crate::Func::call_unchecked_raw(
2384                        &mut store,
2385                        callee.as_non_null(),
2386                        NonNull::new(
2387                            &mut storage[..param_count.max(result_count)]
2388                                as *mut [MaybeUninit<ValRaw>] as _,
2389                        )
2390                        .unwrap(),
2391                    )?;
2392                }
2393
2394                Ok(storage)
2395            }
2396        }
2397
2398        // SAFETY: Per the contract described in this function documentation,
2399        // the `callee` pointer which `call` closes over must be valid when
2400        // called by the closure we queue below.
2401        let call = unsafe {
2402            make_call(
2403                store.as_context_mut(),
2404                guest_thread,
2405                callee,
2406                param_count,
2407                result_count,
2408            )
2409        };
2410
2411        let callee_instance = store
2412            .0
2413            .concurrent_state_mut()
2414            .get_mut(guest_thread.task)?
2415            .instance;
2416
2417        let fun = if callback.is_some() {
2418            assert!(async_);
2419
2420            Box::new(move |store: &mut dyn VMStore| {
2421                self.add_guest_thread_to_instance_table(
2422                    guest_thread.thread,
2423                    store,
2424                    callee_instance.index,
2425                )?;
2426                let old_thread = store.set_thread(guest_thread)?;
2427                log::trace!(
2428                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2429                );
2430
2431                store.enter_instance(callee_instance);
2432
2433                // SAFETY: See the documentation for `make_call` to review the
2434                // contract we must uphold for `call` here.
2435                //
2436                // Per the contract described in the `queue_call`
2437                // documentation, the `callee` pointer which `call` closes
2438                // over must be valid.
2439                let storage = call(store)?;
2440
2441                store.exit_instance(callee_instance)?;
2442
2443                store.set_thread(old_thread)?;
2444                let state = store.concurrent_state_mut();
2445                if let Some(t) = old_thread.guest() {
2446                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2447                }
2448                log::trace!("stackless call: restored {old_thread:?} as current thread");
2449
2450                // SAFETY: `wasmparser` will have validated that the callback
2451                // function returns a `i32` result.
2452                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2453
2454                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2455            })
2456                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2457        } else {
2458            let token = StoreToken::new(store.as_context_mut());
2459            Box::new(move |store: &mut dyn VMStore| {
2460                self.add_guest_thread_to_instance_table(
2461                    guest_thread.thread,
2462                    store,
2463                    callee_instance.index,
2464                )?;
2465                let old_thread = store.set_thread(guest_thread)?;
2466                log::trace!(
2467                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2468                );
2469                let flags = self.id().get(store).instance_flags(callee_instance.index);
2470
2471                // Unless this is a callback-less (i.e. stackful)
2472                // async-lifted export, we need to record that the instance
2473                // cannot be entered until the call returns.
2474                if !async_ {
2475                    store.enter_instance(callee_instance);
2476                }
2477
2478                // SAFETY: See the documentation for `make_call` to review the
2479                // contract we must uphold for `call` here.
2480                //
2481                // Per the contract described in the `queue_call`
2482                // documentation, the `callee` pointer which `call` closes
2483                // over must be valid.
2484                let storage = call(store)?;
2485
2486                if !async_ {
2487                    // This is a sync-lifted export, so now is when we lift the
2488                    // result, optionally call the post-return function, if any,
2489                    // and finally notify any current or future waiters that the
2490                    // subtask has returned.
2491
2492                    let lift = {
2493                        store.exit_instance(callee_instance)?;
2494
2495                        let state = store.concurrent_state_mut();
2496                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2497                            bail_bug!("task has already produced a result");
2498                        }
2499
2500                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2501                            Some(lift) => lift,
2502                            None => bail_bug!("lift_result field is missing"),
2503                        }
2504                    };
2505
2506                    // SAFETY: `result_count` represents the number of core Wasm
2507                    // results returned, per `wasmparser`.
2508                    let result = (lift.lift)(store, unsafe {
2509                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2510                            &storage[..result_count],
2511                        )
2512                    })?;
2513
2514                    let post_return_arg = match result_count {
2515                        0 => ValRaw::i32(0),
2516                        // SAFETY: `result_count` represents the number of
2517                        // core Wasm results returned, per `wasmparser`.
2518                        1 => unsafe { storage[0].assume_init() },
2519                        _ => unreachable!(),
2520                    };
2521
2522                    unsafe {
2523                        call_post_return(
2524                            token.as_context_mut(store),
2525                            post_return.map(|v| v.as_non_null()),
2526                            post_return_arg,
2527                            flags,
2528                        )?;
2529                    }
2530
2531                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2532                }
2533
2534                store.set_thread(old_thread)?;
2535
2536                store
2537                    .concurrent_state_mut()
2538                    .get_mut(guest_thread.task)?
2539                    .exited = true;
2540
2541                // This is a callback-less call, so the implicit thread has now completed
2542                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2543                Ok(None)
2544            })
2545        };
2546
2547        store
2548            .0
2549            .concurrent_state_mut()
2550            .push_high_priority(WorkItem::GuestCall(
2551                callee_instance.index,
2552                GuestCall {
2553                    thread: guest_thread,
2554                    kind: GuestCallKind::StartImplicit(fun),
2555                },
2556            ));
2557
2558        Ok(())
2559    }
2560
2561    /// Prepare (but do not start) a guest->guest call.
2562    ///
2563    /// This is called from fused adapter code generated in
2564    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2565    /// are synthesized Wasm functions which move the parameters from the caller
2566    /// to the callee and the result from the callee to the caller,
2567    /// respectively.  The adapter will call `Self::start_call` immediately
2568    /// after calling this function.
2569    ///
2570    /// SAFETY: All the pointer arguments must be valid pointers to guest
2571    /// entities (and with the expected signatures for the function references
2572    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2573    unsafe fn prepare_call<T: 'static>(
2574        self,
2575        mut store: StoreContextMut<T>,
2576        start: NonNull<VMFuncRef>,
2577        return_: NonNull<VMFuncRef>,
2578        caller_instance: RuntimeComponentInstanceIndex,
2579        callee_instance: RuntimeComponentInstanceIndex,
2580        task_return_type: TypeTupleIndex,
2581        callee_async: bool,
2582        memory: *mut VMMemoryDefinition,
2583        string_encoding: StringEncoding,
2584        caller_info: CallerInfo,
2585    ) -> Result<()> {
2586        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2587            // A task may only call an async-typed function via a sync lower if
2588            // it was created by a call to an async export.  Otherwise, we'll
2589            // trap.
2590            store.0.check_blocking()?;
2591        }
2592
2593        enum ResultInfo {
2594            Heap { results: u32 },
2595            Stack { result_count: u32 },
2596        }
2597
2598        let result_info = match &caller_info {
2599            CallerInfo::Async {
2600                has_result: true,
2601                params,
2602            } => ResultInfo::Heap {
2603                results: match params.last() {
2604                    Some(r) => r.get_u32(),
2605                    None => bail_bug!("retptr missing"),
2606                },
2607            },
2608            CallerInfo::Async {
2609                has_result: false, ..
2610            } => ResultInfo::Stack { result_count: 0 },
2611            CallerInfo::Sync {
2612                result_count,
2613                params,
2614            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2615                results: match params.last() {
2616                    Some(r) => r.get_u32(),
2617                    None => bail_bug!("arg ptr missing"),
2618                },
2619            },
2620            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2621                result_count: *result_count,
2622            },
2623        };
2624
2625        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2626
2627        // Create a new guest task for the call, closing over the `start` and
2628        // `return_` functions to lift the parameters and lower the result,
2629        // respectively.
2630        let start = SendSyncPtr::new(start);
2631        let return_ = SendSyncPtr::new(return_);
2632        let token = StoreToken::new(store.as_context_mut());
2633        let state = store.0.concurrent_state_mut();
2634        let old_thread = state.current_guest_thread()?;
2635
2636        debug_assert_eq!(
2637            state.get_mut(old_thread.task)?.instance,
2638            self.runtime_instance(caller_instance)
2639        );
2640
2641        let guest_thread = GuestTask::new(
2642            state,
2643            Box::new(move |store, dst| {
2644                let mut store = token.as_context_mut(store);
2645                assert!(dst.len() <= MAX_FLAT_PARAMS);
2646                // The `+ 1` here accounts for the return pointer, if any:
2647                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2648                let count = match caller_info {
2649                    // Async callers, if they have a result, use the last
2650                    // parameter as a return pointer so chop that off if
2651                    // relevant here.
2652                    CallerInfo::Async { params, has_result } => {
2653                        let params = &params[..params.len() - usize::from(has_result)];
2654                        for (param, src) in params.iter().zip(&mut src) {
2655                            src.write(*param);
2656                        }
2657                        params.len()
2658                    }
2659
2660                    // Sync callers forward everything directly.
2661                    CallerInfo::Sync { params, .. } => {
2662                        for (param, src) in params.iter().zip(&mut src) {
2663                            src.write(*param);
2664                        }
2665                        params.len()
2666                    }
2667                };
2668                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2669                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2670                // how it was constructed (see
2671                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2672                // for details) we know it takes count parameters and returns
2673                // `dst.len()` results.
2674                unsafe {
2675                    crate::Func::call_unchecked_raw(
2676                        &mut store,
2677                        start.as_non_null(),
2678                        NonNull::new(
2679                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2680                        )
2681                        .unwrap(),
2682                    )?;
2683                }
2684                dst.copy_from_slice(&src[..dst.len()]);
2685                let state = store.0.concurrent_state_mut();
2686                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2687                    state,
2688                    Some(Event::Subtask {
2689                        status: Status::Started,
2690                    }),
2691                )?;
2692                Ok(())
2693            }),
2694            LiftResult {
2695                lift: Box::new(move |store, src| {
2696                    // SAFETY: See comment in closure passed as `lower_params`
2697                    // parameter above.
2698                    let mut store = token.as_context_mut(store);
2699                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2700                    if let ResultInfo::Heap { results } = &result_info {
2701                        my_src.push(ValRaw::u32(*results));
2702                    }
2703
2704                    // Execute the `return_` hook, generated by Wasmtime's FACT
2705                    // compiler, in the context of the old thread. The old
2706                    // thread, this thread's caller, may have `realloc`
2707                    // callbacks invoked for example and those need the correct
2708                    // context set for the current thread.
2709                    let prev = store.0.set_thread(old_thread)?;
2710
2711                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2712                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2713                    // on how it was constructed (see
2714                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2715                    // for details) we know it takes `src.len()` parameters and
2716                    // returns up to 1 result.
2717                    unsafe {
2718                        crate::Func::call_unchecked_raw(
2719                            &mut store,
2720                            return_.as_non_null(),
2721                            my_src.as_mut_slice().into(),
2722                        )?;
2723                    }
2724
2725                    // Restore the previous current thread after the
2726                    // lifting/lowering has returned.
2727                    store.0.set_thread(prev)?;
2728
2729                    let state = store.0.concurrent_state_mut();
2730                    let thread = state.current_guest_thread()?;
2731                    if sync_caller {
2732                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2733                            if let ResultInfo::Stack { result_count } = &result_info {
2734                                match result_count {
2735                                    0 => None,
2736                                    1 => Some(my_src[0]),
2737                                    _ => unreachable!(),
2738                                }
2739                            } else {
2740                                None
2741                            },
2742                        );
2743                    }
2744                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2745                }),
2746                ty: task_return_type,
2747                memory: NonNull::new(memory).map(SendSyncPtr::new),
2748                string_encoding,
2749            },
2750            Caller::Guest { thread: old_thread },
2751            None,
2752            self.runtime_instance(callee_instance),
2753            callee_async,
2754        )?;
2755
2756        // Make the new thread the current one so that `Self::start_call` knows
2757        // which one to start.
2758        store.0.set_thread(guest_thread)?;
2759        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2760
2761        Ok(())
2762    }
2763
2764    /// Call the specified callback function for an async-lifted export.
2765    ///
2766    /// SAFETY: `function` must be a valid reference to a guest function of the
2767    /// correct signature for a callback.
2768    unsafe fn call_callback<T>(
2769        self,
2770        mut store: StoreContextMut<T>,
2771        function: SendSyncPtr<VMFuncRef>,
2772        event: Event,
2773        handle: u32,
2774    ) -> Result<u32> {
2775        let (ordinal, result) = event.parts();
2776        let params = &mut [
2777            ValRaw::u32(ordinal),
2778            ValRaw::u32(handle),
2779            ValRaw::u32(result),
2780        ];
2781        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2782        // `wasmtime-cranelift`-generated fused adapter code or
2783        // `component::Options`.  Per `wasmparser` callback signature
2784        // validation, we know it takes three parameters and returns one.
2785        unsafe {
2786            crate::Func::call_unchecked_raw(
2787                &mut store,
2788                function.as_non_null(),
2789                params.as_mut_slice().into(),
2790            )?;
2791        }
2792        Ok(params[0].get_u32())
2793    }
2794
2795    /// Start a guest->guest call previously prepared using
2796    /// `Self::prepare_call`.
2797    ///
2798    /// This is called from fused adapter code generated in
2799    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2800    /// this function immediately after calling `Self::prepare_call`.
2801    ///
2802    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2803    /// functions with the appropriate signatures for the current guest task.
2804    /// If this is a call to an async-lowered import, the actual call may be
2805    /// deferred and run after this function returns, in which case the pointer
2806    /// arguments must also be valid when the call happens.
2807    unsafe fn start_call<T: 'static>(
2808        self,
2809        mut store: StoreContextMut<T>,
2810        callback: *mut VMFuncRef,
2811        post_return: *mut VMFuncRef,
2812        callee: NonNull<VMFuncRef>,
2813        param_count: u32,
2814        result_count: u32,
2815        flags: u32,
2816        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2817    ) -> Result<u32> {
2818        let token = StoreToken::new(store.as_context_mut());
2819        let async_caller = storage.is_none();
2820        let state = store.0.concurrent_state_mut();
2821        let guest_thread = state.current_guest_thread()?;
2822        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2823        let callee = SendSyncPtr::new(callee);
2824        let param_count = usize::try_from(param_count)?;
2825        assert!(param_count <= MAX_FLAT_PARAMS);
2826        let result_count = usize::try_from(result_count)?;
2827        assert!(result_count <= MAX_FLAT_RESULTS);
2828
2829        let task = state.get_mut(guest_thread.task)?;
2830        if let Some(callback) = NonNull::new(callback) {
2831            // We're calling an async-lifted export with a callback, so store
2832            // the callback and related context as part of the task so we can
2833            // call it later when needed.
2834            let callback = SendSyncPtr::new(callback);
2835            task.callback = Some(Box::new(move |store, event, handle| {
2836                let store = token.as_context_mut(store);
2837                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2838            }));
2839        }
2840
2841        let Caller::Guest { thread: caller } = &task.caller else {
2842            // As of this writing, `start_call` is only used for guest->guest
2843            // calls.
2844            bail_bug!("start_call unexpectedly invoked for host->guest call");
2845        };
2846        let caller = *caller;
2847        let caller_instance = state.get_mut(caller.task)?.instance;
2848
2849        // Queue the call as a "high priority" work item.
2850        unsafe {
2851            self.queue_call(
2852                store.as_context_mut(),
2853                guest_thread,
2854                callee,
2855                param_count,
2856                result_count,
2857                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2858                NonNull::new(callback).map(SendSyncPtr::new),
2859                NonNull::new(post_return).map(SendSyncPtr::new),
2860            )?;
2861        }
2862
2863        let state = store.0.concurrent_state_mut();
2864
2865        // Use the caller's `GuestThread::sync_call_set` to register interest in
2866        // the subtask...
2867        let guest_waitable = Waitable::Guest(guest_thread.task);
2868        let old_set = guest_waitable.common(state)?.set;
2869        let set = state.get_mut(caller.thread)?.sync_call_set;
2870        guest_waitable.join(state, Some(set))?;
2871
2872        store.0.set_thread(CurrentThread::None)?;
2873
2874        // ... and suspend this fiber temporarily while we wait for it to start.
2875        //
2876        // Note that we _could_ call the callee directly using the current fiber
2877        // rather than suspend this one, but that would make reasoning about the
2878        // event loop more complicated and is probably only worth doing if
2879        // there's a measurable performance benefit.  In addition, it would mean
2880        // blocking the caller if the callee calls a blocking sync-lowered
2881        // import, and as of this writing the spec says we must not do that.
2882        //
2883        // Alternatively, the fused adapter code could be modified to call the
2884        // callee directly without calling a host-provided intrinsic at all (in
2885        // which case it would need to do its own, inline backpressure checks,
2886        // etc.).  Again, we'd want to see a measurable performance benefit
2887        // before committing to such an optimization.  And again, we'd need to
2888        // update the spec to allow that.
2889        let (status, waitable) = loop {
2890            store.0.suspend(SuspendReason::Waiting {
2891                set,
2892                thread: caller,
2893                // Normally, `StoreOpaque::suspend` would assert it's being
2894                // called from a context where blocking is allowed.  However, if
2895                // `async_caller` is `true`, we'll only "block" long enough for
2896                // the callee to start, i.e. we won't repeat this loop, so we
2897                // tell `suspend` it's okay even if we're not allowed to block.
2898                // Alternatively, if the callee is not an async function, then
2899                // we know it won't block anyway.
2900                skip_may_block_check: async_caller || !callee_async,
2901            })?;
2902
2903            let state = store.0.concurrent_state_mut();
2904
2905            log::trace!("taking event for {:?}", guest_thread.task);
2906            let event = guest_waitable.take_event(state)?;
2907            let Some(Event::Subtask { status }) = event else {
2908                bail_bug!("subtasks should only get subtask events, got {event:?}")
2909            };
2910
2911            log::trace!("status {status:?} for {:?}", guest_thread.task);
2912
2913            if status == Status::Returned {
2914                // It returned, so we can stop waiting.
2915                break (status, None);
2916            } else if async_caller {
2917                // It hasn't returned yet, but the caller is calling via an
2918                // async-lowered import, so we generate a handle for the task
2919                // waitable and return the status.
2920                let handle = store
2921                    .0
2922                    .instance_state(caller_instance)
2923                    .handle_table()
2924                    .subtask_insert_guest(guest_thread.task.rep())?;
2925                store
2926                    .0
2927                    .concurrent_state_mut()
2928                    .get_mut(guest_thread.task)?
2929                    .common
2930                    .handle = Some(handle);
2931                break (status, Some(handle));
2932            } else {
2933                // The callee hasn't returned yet, and the caller is calling via
2934                // a sync-lowered import, so we loop and keep waiting until the
2935                // callee returns.
2936            }
2937        };
2938
2939        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2940
2941        // Reset the current thread to point to the caller as it resumes control.
2942        store.0.set_thread(caller)?;
2943        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2944        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2945
2946        if let Some(storage) = storage {
2947            // The caller used a sync-lowered import to call an async-lifted
2948            // export, in which case the result, if any, has been stashed in
2949            // `GuestTask::sync_result`.
2950            let state = store.0.concurrent_state_mut();
2951            let task = state.get_mut(guest_thread.task)?;
2952            if let Some(result) = task.sync_result.take()? {
2953                if let Some(result) = result {
2954                    storage[0] = MaybeUninit::new(result);
2955                }
2956
2957                if task.exited && task.ready_to_delete() {
2958                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2959                }
2960            }
2961        }
2962
2963        Ok(status.pack(waitable))
2964    }
2965
2966    /// Poll the specified future once on behalf of a guest->host call using an
2967    /// async-lowered import.
2968    ///
2969    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2970    /// `Pending`, add it to the set of futures to be polled as part of this
2971    /// instance's event loop until it completes, and then return
2972    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2973    ///
2974    /// Whether the future returns `Ready` immediately or later, the `lower`
2975    /// function will be used to lower the result, if any, into the guest caller's
2976    /// stack and linear memory. The `lower` function is invoked with `None` if
2977    /// the future is cancelled.
2978    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2979        self,
2980        mut store: StoreContextMut<'_, T>,
2981        future: impl Future<Output = Result<R>> + Send + 'static,
2982        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2983    ) -> Result<Option<u32>> {
2984        let token = StoreToken::new(store.as_context_mut());
2985        let state = store.0.concurrent_state_mut();
2986        let task = state.current_host_thread()?;
2987
2988        // Create an abortable future which hooks calls to poll and manages call
2989        // context state for the future.
2990        let (join_handle, future) = JoinHandle::run(future);
2991        {
2992            let state = &mut state.get_mut(task)?.state;
2993            assert!(matches!(state, HostTaskState::CalleeStarted));
2994            *state = HostTaskState::CalleeRunning(join_handle);
2995        }
2996
2997        let mut future = Box::pin(future);
2998
2999        // Finally, poll the future.  We can use a dummy `Waker` here because
3000        // we'll add the future to `ConcurrentState::futures` and poll it
3001        // automatically from the event loop if it doesn't complete immediately
3002        // here.
3003        let poll = tls::set(store.0, || {
3004            future
3005                .as_mut()
3006                .poll(&mut Context::from_waker(&Waker::noop()))
3007        });
3008
3009        match poll {
3010            // It finished immediately; lower the result and delete the task.
3011            Poll::Ready(result) => {
3012                let result = result.transpose()?;
3013                lower(store.as_context_mut(), result)?;
3014                return Ok(None);
3015            }
3016
3017            // Future isn't ready yet, so fall through.
3018            Poll::Pending => {}
3019        }
3020
3021        // It hasn't finished yet; add the future to
3022        // `ConcurrentState::futures` so it will be polled by the event
3023        // loop and allocate a waitable handle to return to the guest.
3024
3025        // Wrap the future in a closure responsible for lowering the result into
3026        // the guest's stack and memory, as well as notifying any waiters that
3027        // the task returned.
3028        let future = Box::pin(async move {
3029            let result = match future.await {
3030                Some(result) => Some(result?),
3031                None => None,
3032            };
3033            let on_complete = move |store: &mut dyn VMStore| {
3034                // Restore the `current_thread` to be the host so `lower` knows
3035                // how to manipulate borrows and knows which scope of borrows
3036                // to check.
3037                let mut store = token.as_context_mut(store);
3038                let old = store.0.set_thread(task)?;
3039
3040                let status = if result.is_some() {
3041                    Status::Returned
3042                } else {
3043                    Status::ReturnCancelled
3044                };
3045
3046                lower(store.as_context_mut(), result)?;
3047                let state = store.0.concurrent_state_mut();
3048                match &mut state.get_mut(task)?.state {
3049                    // The task is already flagged as finished because it was
3050                    // cancelled. No need to transition further.
3051                    HostTaskState::CalleeDone { .. } => {}
3052
3053                    // Otherwise transition this task to the done state.
3054                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3055                }
3056                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3057
3058                store.0.set_thread(old)?;
3059                Ok(())
3060            };
3061
3062            // Here we schedule a task to run on a worker fiber to do the
3063            // lowering since it may involve a call to the guest's realloc
3064            // function. This is necessary because calling the guest while
3065            // there are host embedder frames on the stack is unsound.
3066            tls::get(move |store| {
3067                store
3068                    .concurrent_state_mut()
3069                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3070                        on_complete,
3071                    ))));
3072                Ok(())
3073            })
3074        });
3075
3076        // Make this task visible to the guest and then record what it
3077        // was made visible as.
3078        let state = store.0.concurrent_state_mut();
3079        state.push_future(future);
3080        let caller = state.get_mut(task)?.caller;
3081        let instance = state.get_mut(caller.task)?.instance;
3082        let handle = store
3083            .0
3084            .instance_state(instance)
3085            .handle_table()
3086            .subtask_insert_host(task.rep())?;
3087        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
3088        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3089
3090        // Restore the currently running thread to this host task's
3091        // caller. Note that the host task isn't deallocated as it's
3092        // within the store and will get deallocated later.
3093        store.0.set_thread(caller)?;
3094        Ok(Some(handle))
3095    }
3096
3097    /// Implements the `task.return` intrinsic, lifting the result for the
3098    /// current guest task.
3099    pub(crate) fn task_return(
3100        self,
3101        store: &mut dyn VMStore,
3102        ty: TypeTupleIndex,
3103        options: OptionsIndex,
3104        storage: &[ValRaw],
3105    ) -> Result<()> {
3106        let state = store.concurrent_state_mut();
3107        let guest_thread = state.current_guest_thread()?;
3108        let lift = state
3109            .get_mut(guest_thread.task)?
3110            .lift_result
3111            .take()
3112            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3113        if !state.get_mut(guest_thread.task)?.result.is_none() {
3114            bail_bug!("task result unexpectedly already set");
3115        }
3116
3117        let CanonicalOptions {
3118            string_encoding,
3119            data_model,
3120            ..
3121        } = &self.id().get(store).component().env_component().options[options];
3122
3123        let invalid = ty != lift.ty
3124            || string_encoding != &lift.string_encoding
3125            || match data_model {
3126                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3127                    Some(memory) => {
3128                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3129                        let actual = self.id().get(store).runtime_memory(memory);
3130                        expected != actual.as_ptr()
3131                    }
3132                    // Memory not specified, meaning it didn't need to be
3133                    // specified per validation, so not invalid.
3134                    None => false,
3135                },
3136                // Always invalid as this isn't supported.
3137                CanonicalOptionsDataModel::Gc { .. } => true,
3138            };
3139
3140        if invalid {
3141            bail!(Trap::TaskReturnInvalid);
3142        }
3143
3144        log::trace!("task.return for {guest_thread:?}");
3145
3146        let result = (lift.lift)(store, storage)?;
3147        self.task_complete(store, guest_thread.task, result, Status::Returned)
3148    }
3149
3150    /// Implements the `task.cancel` intrinsic.
3151    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3152        let state = store.concurrent_state_mut();
3153        let guest_thread = state.current_guest_thread()?;
3154        let task = state.get_mut(guest_thread.task)?;
3155        if !task.cancel_sent {
3156            bail!(Trap::TaskCancelNotCancelled);
3157        }
3158        _ = task
3159            .lift_result
3160            .take()
3161            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3162
3163        if !task.result.is_none() {
3164            bail_bug!("task result should not bet set yet");
3165        }
3166
3167        log::trace!("task.cancel for {guest_thread:?}");
3168
3169        self.task_complete(
3170            store,
3171            guest_thread.task,
3172            Box::new(DummyResult),
3173            Status::ReturnCancelled,
3174        )
3175    }
3176
3177    /// Complete the specified guest task (i.e. indicate that it has either
3178    /// returned a (possibly empty) result or cancelled itself).
3179    ///
3180    /// This will return any resource borrows and notify any current or future
3181    /// waiters that the task has completed.
3182    fn task_complete(
3183        self,
3184        store: &mut StoreOpaque,
3185        guest_task: TableId<GuestTask>,
3186        result: Box<dyn Any + Send + Sync>,
3187        status: Status,
3188    ) -> Result<()> {
3189        store
3190            .component_resource_tables(Some(self))
3191            .validate_scope_exit()?;
3192
3193        let state = store.concurrent_state_mut();
3194        let task = state.get_mut(guest_task)?;
3195
3196        if let Caller::Host { tx, .. } = &mut task.caller {
3197            if let Some(tx) = tx.take() {
3198                _ = tx.send(result);
3199            }
3200        } else {
3201            task.result = Some(result);
3202            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3203        }
3204
3205        Ok(())
3206    }
3207
3208    /// Implements the `waitable-set.new` intrinsic.
3209    pub(crate) fn waitable_set_new(
3210        self,
3211        store: &mut StoreOpaque,
3212        caller_instance: RuntimeComponentInstanceIndex,
3213    ) -> Result<u32> {
3214        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3215        let handle = store
3216            .instance_state(self.runtime_instance(caller_instance))
3217            .handle_table()
3218            .waitable_set_insert(set.rep())?;
3219        log::trace!("new waitable set {set:?} (handle {handle})");
3220        Ok(handle)
3221    }
3222
3223    /// Implements the `waitable-set.drop` intrinsic.
3224    pub(crate) fn waitable_set_drop(
3225        self,
3226        store: &mut StoreOpaque,
3227        caller_instance: RuntimeComponentInstanceIndex,
3228        set: u32,
3229    ) -> Result<()> {
3230        let rep = store
3231            .instance_state(self.runtime_instance(caller_instance))
3232            .handle_table()
3233            .waitable_set_remove(set)?;
3234
3235        log::trace!("drop waitable set {rep} (handle {set})");
3236
3237        // Note that we're careful to check for waiters _before_ deleting the
3238        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3239        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3240        if !store
3241            .concurrent_state_mut()
3242            .get_mut(TableId::<WaitableSet>::new(rep))?
3243            .waiting
3244            .is_empty()
3245        {
3246            bail!(Trap::WaitableSetDropHasWaiters);
3247        }
3248
3249        store
3250            .concurrent_state_mut()
3251            .delete(TableId::<WaitableSet>::new(rep))?;
3252
3253        Ok(())
3254    }
3255
3256    /// Implements the `waitable.join` intrinsic.
3257    pub(crate) fn waitable_join(
3258        self,
3259        store: &mut StoreOpaque,
3260        caller_instance: RuntimeComponentInstanceIndex,
3261        waitable_handle: u32,
3262        set_handle: u32,
3263    ) -> Result<()> {
3264        let mut instance = self.id().get_mut(store);
3265        let waitable =
3266            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3267
3268        let set = if set_handle == 0 {
3269            None
3270        } else {
3271            let set = instance.instance_states().0[caller_instance]
3272                .handle_table()
3273                .waitable_set_rep(set_handle)?;
3274
3275            let state = store.concurrent_state_mut();
3276            if let Some(old) = waitable.common(state)?.set
3277                && state.get_mut(old)?.is_sync_call_set
3278            {
3279                bail!(Trap::WaitableSyncAndAsync);
3280            }
3281
3282            Some(TableId::<WaitableSet>::new(set))
3283        };
3284
3285        log::trace!(
3286            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3287        );
3288
3289        waitable.join(store.concurrent_state_mut(), set)
3290    }
3291
3292    /// Implements the `subtask.drop` intrinsic.
3293    pub(crate) fn subtask_drop(
3294        self,
3295        store: &mut StoreOpaque,
3296        caller_instance: RuntimeComponentInstanceIndex,
3297        task_id: u32,
3298    ) -> Result<()> {
3299        self.waitable_join(store, caller_instance, task_id, 0)?;
3300
3301        let (rep, is_host) = store
3302            .instance_state(self.runtime_instance(caller_instance))
3303            .handle_table()
3304            .subtask_remove(task_id)?;
3305
3306        let concurrent_state = store.concurrent_state_mut();
3307        let (waitable, delete) = if is_host {
3308            let id = TableId::<HostTask>::new(rep);
3309            let task = concurrent_state.get_mut(id)?;
3310            match &task.state {
3311                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3312                HostTaskState::CalleeDone { .. } => {}
3313                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3314                    bail_bug!("invalid state for callee in `subtask.drop`")
3315                }
3316            }
3317            (Waitable::Host(id), true)
3318        } else {
3319            let id = TableId::<GuestTask>::new(rep);
3320            let task = concurrent_state.get_mut(id)?;
3321            if task.lift_result.is_some() {
3322                bail!(Trap::SubtaskDropNotResolved);
3323            }
3324            (
3325                Waitable::Guest(id),
3326                concurrent_state.get_mut(id)?.ready_to_delete(),
3327            )
3328        };
3329
3330        waitable.common(concurrent_state)?.handle = None;
3331
3332        // If this subtask has an event that means that the terminal status of
3333        // this subtask wasn't yet received so it can't be dropped yet.
3334        if waitable.take_event(concurrent_state)?.is_some() {
3335            bail!(Trap::SubtaskDropNotResolved);
3336        }
3337
3338        if delete {
3339            waitable.delete_from(concurrent_state)?;
3340        }
3341
3342        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3343        Ok(())
3344    }
3345
3346    /// Implements the `waitable-set.wait` intrinsic.
3347    pub(crate) fn waitable_set_wait(
3348        self,
3349        store: &mut StoreOpaque,
3350        options: OptionsIndex,
3351        set: u32,
3352        payload: u32,
3353    ) -> Result<u32> {
3354        if !self.options(store, options).async_ {
3355            // The caller may only call `waitable-set.wait` from an async task
3356            // (i.e. a task created via a call to an async export).
3357            // Otherwise, we'll trap.
3358            store.check_blocking()?;
3359        }
3360
3361        let &CanonicalOptions {
3362            cancellable,
3363            instance: caller_instance,
3364            ..
3365        } = &self.id().get(store).component().env_component().options[options];
3366        let rep = store
3367            .instance_state(self.runtime_instance(caller_instance))
3368            .handle_table()
3369            .waitable_set_rep(set)?;
3370
3371        self.waitable_check(
3372            store,
3373            cancellable,
3374            WaitableCheck::Wait,
3375            WaitableCheckParams {
3376                set: TableId::new(rep),
3377                options,
3378                payload,
3379            },
3380        )
3381    }
3382
3383    /// Implements the `waitable-set.poll` intrinsic.
3384    pub(crate) fn waitable_set_poll(
3385        self,
3386        store: &mut StoreOpaque,
3387        options: OptionsIndex,
3388        set: u32,
3389        payload: u32,
3390    ) -> Result<u32> {
3391        let &CanonicalOptions {
3392            cancellable,
3393            instance: caller_instance,
3394            ..
3395        } = &self.id().get(store).component().env_component().options[options];
3396        let rep = store
3397            .instance_state(self.runtime_instance(caller_instance))
3398            .handle_table()
3399            .waitable_set_rep(set)?;
3400
3401        self.waitable_check(
3402            store,
3403            cancellable,
3404            WaitableCheck::Poll,
3405            WaitableCheckParams {
3406                set: TableId::new(rep),
3407                options,
3408                payload,
3409            },
3410        )
3411    }
3412
3413    /// Implements the `thread.index` intrinsic.
3414    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3415        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3416        match store
3417            .concurrent_state_mut()
3418            .get_mut(thread_id)?
3419            .instance_rep
3420        {
3421            Some(r) => Ok(r),
3422            None => bail_bug!("thread should have instance_rep by now"),
3423        }
3424    }
3425
3426    /// Implements the `thread.new-indirect` intrinsic.
3427    pub(crate) fn thread_new_indirect<T: 'static>(
3428        self,
3429        mut store: StoreContextMut<T>,
3430        runtime_instance: RuntimeComponentInstanceIndex,
3431        _func_ty_idx: TypeFuncIndex, // currently unused
3432        start_func_table_idx: RuntimeTableIndex,
3433        start_func_idx: u32,
3434        context: i32,
3435    ) -> Result<u32> {
3436        log::trace!("creating new thread");
3437
3438        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3439        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3440        let callee = instance
3441            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3442            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3443        if callee.type_index(store.0) != start_func_ty.type_index() {
3444            bail!(Trap::ThreadNewIndirectInvalidType);
3445        }
3446
3447        let token = StoreToken::new(store.as_context_mut());
3448        let start_func = Box::new(
3449            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3450                let old_thread = store.set_thread(guest_thread)?;
3451                log::trace!(
3452                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3453                );
3454
3455                let mut store = token.as_context_mut(store);
3456                let mut params = [ValRaw::i32(context)];
3457                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3458                // on a separate fiber if we're running in an async store.
3459                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3460
3461                store.0.set_thread(old_thread)?;
3462
3463                store.0.cleanup_thread(
3464                    guest_thread,
3465                    self.runtime_instance(runtime_instance),
3466                    CleanupTask::Yes,
3467                )?;
3468                log::trace!("explicit thread {guest_thread:?} completed");
3469                let state = store.0.concurrent_state_mut();
3470                if let Some(t) = old_thread.guest() {
3471                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3472                }
3473                log::trace!("thread start: restored {old_thread:?} as current thread");
3474
3475                Ok(())
3476            },
3477        );
3478
3479        let state = store.0.concurrent_state_mut();
3480        let current_thread = state.current_guest_thread()?;
3481        let parent_task = current_thread.task;
3482
3483        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3484        let thread_id = state.push(new_thread)?;
3485        state.get_mut(parent_task)?.threads.insert(thread_id);
3486
3487        log::trace!("new thread with id {thread_id:?} created");
3488
3489        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3490    }
3491
3492    pub(crate) fn resume_thread(
3493        self,
3494        store: &mut StoreOpaque,
3495        runtime_instance: RuntimeComponentInstanceIndex,
3496        thread_idx: u32,
3497        high_priority: bool,
3498        allow_ready: bool,
3499    ) -> Result<()> {
3500        let thread_id =
3501            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3502        let state = store.concurrent_state_mut();
3503        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3504        let thread = state.get_mut(guest_thread.thread)?;
3505
3506        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3507            GuestThreadState::NotStartedExplicit(start_func) => {
3508                log::trace!("starting thread {guest_thread:?}");
3509                let guest_call = WorkItem::GuestCall(
3510                    runtime_instance,
3511                    GuestCall {
3512                        thread: guest_thread,
3513                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3514                            start_func(store, guest_thread)
3515                        })),
3516                    },
3517                );
3518                store
3519                    .concurrent_state_mut()
3520                    .push_work_item(guest_call, high_priority);
3521            }
3522            GuestThreadState::Suspended(fiber) => {
3523                log::trace!("resuming thread {thread_id:?} that was suspended");
3524                store
3525                    .concurrent_state_mut()
3526                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3527            }
3528            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3529                log::trace!("resuming thread {thread_id:?} that was ready");
3530                thread.state = GuestThreadState::Ready { fiber, cancellable };
3531                store
3532                    .concurrent_state_mut()
3533                    .promote_thread_work_item(guest_thread);
3534            }
3535            other => {
3536                thread.state = other;
3537                bail!(Trap::CannotResumeThread);
3538            }
3539        }
3540        Ok(())
3541    }
3542
3543    fn add_guest_thread_to_instance_table(
3544        self,
3545        thread_id: TableId<GuestThread>,
3546        store: &mut StoreOpaque,
3547        runtime_instance: RuntimeComponentInstanceIndex,
3548    ) -> Result<u32> {
3549        let guest_id = store
3550            .instance_state(self.runtime_instance(runtime_instance))
3551            .thread_handle_table()
3552            .guest_thread_insert(thread_id.rep())?;
3553        store
3554            .concurrent_state_mut()
3555            .get_mut(thread_id)?
3556            .instance_rep = Some(guest_id);
3557        Ok(guest_id)
3558    }
3559
3560    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3561    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3562    pub(crate) fn suspension_intrinsic(
3563        self,
3564        store: &mut StoreOpaque,
3565        caller: RuntimeComponentInstanceIndex,
3566        cancellable: bool,
3567        yielding: bool,
3568        to_thread: SuspensionTarget,
3569    ) -> Result<WaitResult> {
3570        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3571        if to_thread.is_none() {
3572            let state = store.concurrent_state_mut();
3573            if yielding {
3574                // This is a `thread.yield` call
3575                if !state.may_block(guest_thread.task)? {
3576                    // In a non-blocking context, a `thread.yield` may trigger
3577                    // other threads in the same component instance to run.
3578                    if !state.promote_instance_local_thread_work_item(caller) {
3579                        // No other threads are runnable, so just return
3580                        return Ok(WaitResult::Completed);
3581                    }
3582                }
3583            } else {
3584                // The caller may only call `thread.suspend` from an async task
3585                // (i.e. a task created via a call to an async export).
3586                // Otherwise, we'll trap.
3587                store.check_blocking()?;
3588            }
3589        }
3590
3591        // There could be a pending cancellation from a previous uncancellable wait
3592        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3593            return Ok(WaitResult::Cancelled);
3594        }
3595
3596        match to_thread {
3597            SuspensionTarget::SomeSuspended(thread) => {
3598                self.resume_thread(store, caller, thread, true, false)?
3599            }
3600            SuspensionTarget::Some(thread) => {
3601                self.resume_thread(store, caller, thread, true, true)?
3602            }
3603            SuspensionTarget::None => { /* nothing to do */ }
3604        }
3605
3606        let reason = if yielding {
3607            SuspendReason::Yielding {
3608                thread: guest_thread,
3609                cancellable,
3610                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3611                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3612                // panic if we called it in a non-blocking context.
3613                skip_may_block_check: to_thread.is_some(),
3614            }
3615        } else {
3616            SuspendReason::ExplicitlySuspending {
3617                thread: guest_thread,
3618                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3619                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3620                // panic if we called it in a non-blocking context.
3621                skip_may_block_check: to_thread.is_some(),
3622            }
3623        };
3624
3625        store.suspend(reason)?;
3626
3627        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3628            Ok(WaitResult::Cancelled)
3629        } else {
3630            Ok(WaitResult::Completed)
3631        }
3632    }
3633
3634    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3635    fn waitable_check(
3636        self,
3637        store: &mut StoreOpaque,
3638        cancellable: bool,
3639        check: WaitableCheck,
3640        params: WaitableCheckParams,
3641    ) -> Result<u32> {
3642        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3643
3644        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3645
3646        let state = store.concurrent_state_mut();
3647        let task = state.get_mut(guest_thread.task)?;
3648
3649        // If we're waiting, and there are no events immediately available,
3650        // suspend the fiber until that changes.
3651        match &check {
3652            WaitableCheck::Wait => {
3653                let set = params.set;
3654
3655                if (task.event.is_none()
3656                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3657                    && state.get_mut(set)?.ready.is_empty()
3658                {
3659                    if cancellable {
3660                        let old = state
3661                            .get_mut(guest_thread.thread)?
3662                            .wake_on_cancel
3663                            .replace(set);
3664                        if !old.is_none() {
3665                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3666                        }
3667                    }
3668
3669                    store.suspend(SuspendReason::Waiting {
3670                        set,
3671                        thread: guest_thread,
3672                        skip_may_block_check: false,
3673                    })?;
3674                }
3675            }
3676            WaitableCheck::Poll => {}
3677        }
3678
3679        log::trace!(
3680            "waitable check for {guest_thread:?}; set {:?}, part two",
3681            params.set
3682        );
3683
3684        // Deliver any pending events to the guest and return.
3685        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3686
3687        let (ordinal, handle, result) = match &check {
3688            WaitableCheck::Wait => {
3689                let (event, waitable) = match event {
3690                    Some(p) => p,
3691                    None => bail_bug!("event expected to be present"),
3692                };
3693                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3694                let (ordinal, result) = event.parts();
3695                (ordinal, handle, result)
3696            }
3697            WaitableCheck::Poll => {
3698                if let Some((event, waitable)) = event {
3699                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3700                    let (ordinal, result) = event.parts();
3701                    (ordinal, handle, result)
3702                } else {
3703                    log::trace!(
3704                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3705                        guest_thread.task,
3706                        params.set
3707                    );
3708                    let (ordinal, result) = Event::None.parts();
3709                    (ordinal, 0, result)
3710                }
3711            }
3712        };
3713        let memory = self.options_memory_mut(store, params.options);
3714        let ptr = func::validate_inbounds_dynamic(
3715            &CanonicalAbiInfo::POINTER_PAIR,
3716            memory,
3717            &ValRaw::u32(params.payload),
3718        )?;
3719        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3720        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3721        Ok(ordinal)
3722    }
3723
3724    /// Implements the `subtask.cancel` intrinsic.
3725    pub(crate) fn subtask_cancel(
3726        self,
3727        store: &mut StoreOpaque,
3728        caller_instance: RuntimeComponentInstanceIndex,
3729        async_: bool,
3730        task_id: u32,
3731    ) -> Result<u32> {
3732        if !async_ {
3733            // The caller may only sync call `subtask.cancel` from an async task
3734            // (i.e. a task created via a call to an async export).  Otherwise,
3735            // we'll trap.
3736            store.check_blocking()?;
3737        }
3738
3739        let (rep, is_host) = store
3740            .instance_state(self.runtime_instance(caller_instance))
3741            .handle_table()
3742            .subtask_rep(task_id)?;
3743        let waitable = if is_host {
3744            Waitable::Host(TableId::<HostTask>::new(rep))
3745        } else {
3746            Waitable::Guest(TableId::<GuestTask>::new(rep))
3747        };
3748        let concurrent_state = store.concurrent_state_mut();
3749
3750        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3751
3752        let needs_block;
3753        if let Waitable::Host(host_task) = waitable {
3754            let state = &mut concurrent_state.get_mut(host_task)?.state;
3755            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3756                // If the callee is still running, signal an abort is requested.
3757                //
3758                // After cancelling this falls through to block waiting for the
3759                // host task to actually finish assuming that `async_` is false.
3760                // This blocking behavior resolves the race of `handle.abort()`
3761                // with the task actually getting cancelled or finishing.
3762                HostTaskState::CalleeRunning(handle) => {
3763                    handle.abort();
3764                    needs_block = true;
3765                }
3766
3767                // Cancellation was already requested, so fail as the task can't
3768                // be cancelled twice.
3769                HostTaskState::CalleeDone { cancelled } => {
3770                    if cancelled {
3771                        bail!(Trap::SubtaskCancelAfterTerminal);
3772                    } else {
3773                        // The callee is already done so there's no need to
3774                        // block further for an event.
3775                        needs_block = false;
3776                    }
3777                }
3778
3779                // These states should not be possible for a subtask that's
3780                // visible from the guest, so trap here.
3781                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3782                    bail_bug!("invalid states for host callee")
3783                }
3784            }
3785        } else {
3786            let guest_task = TableId::<GuestTask>::new(rep);
3787            let task = concurrent_state.get_mut(guest_task)?;
3788            if !task.already_lowered_parameters() {
3789                store.cancel_guest_subtask_without_lowered_parameters(
3790                    self.runtime_instance(caller_instance),
3791                    guest_task,
3792                )?;
3793                return Ok(Status::StartCancelled as u32);
3794            } else if !task.returned_or_cancelled() {
3795                // Started, but not yet returned or cancelled; send the
3796                // `CANCELLED` event
3797                task.cancel_sent = true;
3798                // Note that this might overwrite an event that was set earlier
3799                // (e.g. `Event::None` if the task is yielding, or
3800                // `Event::Cancelled` if it was already cancelled), but that's
3801                // okay -- this should supersede the previous state.
3802                task.event = Some(Event::Cancelled);
3803                let runtime_instance = task.instance.index;
3804                for thread in task.threads.clone() {
3805                    let thread = QualifiedThreadId {
3806                        task: guest_task,
3807                        thread,
3808                    };
3809                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
3810                    if let Some(set) = thread_mut.wake_on_cancel.take() {
3811                        // The thread is in a cancellable wait, so wake it up:
3812                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3813                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3814                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3815                                runtime_instance,
3816                                GuestCall {
3817                                    thread,
3818                                    kind: GuestCallKind::DeliverEvent {
3819                                        instance,
3820                                        set: None,
3821                                    },
3822                                },
3823                            ),
3824                            None => bail_bug!("thread not present in wake_on_cancel set"),
3825                        };
3826                        concurrent_state.push_high_priority(item);
3827
3828                        let caller = concurrent_state.current_guest_thread()?;
3829                        store.suspend(SuspendReason::Yielding {
3830                            thread: caller,
3831                            cancellable: false,
3832                            // `subtask.cancel` is not allowed to be called in a
3833                            // sync context, so we cannot skip the may-block check.
3834                            skip_may_block_check: false,
3835                        })?;
3836                        break;
3837                    } else if let GuestThreadState::Ready {
3838                        cancellable: true, ..
3839                    } = &thread_mut.state
3840                    {
3841                        // The thread is in a cancellable yield, so yield back
3842                        // to it.
3843                        let caller = concurrent_state.current_guest_thread()?;
3844                        concurrent_state.promote_thread_work_item(thread);
3845                        store.suspend(SuspendReason::Yielding {
3846                            thread: caller,
3847                            cancellable: false,
3848                            skip_may_block_check: false,
3849                        })?;
3850                        break;
3851                    }
3852                }
3853
3854                // Guest tasks need to block if they have not yet returned or
3855                // cancelled, even as a result of the event delivery above.
3856                needs_block = !store
3857                    .concurrent_state_mut()
3858                    .get_mut(guest_task)?
3859                    .returned_or_cancelled()
3860            } else {
3861                needs_block = false;
3862            }
3863        };
3864
3865        // If we need to block waiting on the terminal status of this subtask
3866        // then return immediately in `async` mode, or otherwise wait for the
3867        // event to get signaled through the store.
3868        if needs_block {
3869            if async_ {
3870                return Ok(BLOCKED);
3871            }
3872
3873            // Wait for this waitable to get signaled with its terminal status
3874            // from the completion callback enqueued by `first_poll`. Once
3875            // that's done fall through to the sahred
3876            store.wait_for_event(waitable)?;
3877
3878            // .. fall through to determine what event's in store for us.
3879        }
3880
3881        let event = waitable.take_event(store.concurrent_state_mut())?;
3882        if let Some(Event::Subtask {
3883            status: status @ (Status::Returned | Status::ReturnCancelled),
3884        }) = event
3885        {
3886            Ok(status as u32)
3887        } else {
3888            bail!(Trap::SubtaskCancelAfterTerminal);
3889        }
3890    }
3891}
3892
3893/// Trait representing component model ABI async intrinsics and fused adapter
3894/// helper functions.
3895///
3896/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3897/// which must be valid for at least the duration of the call (and possibly for
3898/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3899/// pointers used for async calls).
3900pub trait VMComponentAsyncStore {
3901    /// A helper function for fused adapter modules involving calls where the
3902    /// one of the caller or callee is async.
3903    ///
3904    /// This helper is not used when the caller and callee both use the sync
3905    /// ABI, only when at least one is async is this used.
3906    unsafe fn prepare_call(
3907        &mut self,
3908        instance: Instance,
3909        memory: *mut VMMemoryDefinition,
3910        start: NonNull<VMFuncRef>,
3911        return_: NonNull<VMFuncRef>,
3912        caller_instance: RuntimeComponentInstanceIndex,
3913        callee_instance: RuntimeComponentInstanceIndex,
3914        task_return_type: TypeTupleIndex,
3915        callee_async: bool,
3916        string_encoding: StringEncoding,
3917        result_count: u32,
3918        storage: *mut ValRaw,
3919        storage_len: usize,
3920    ) -> Result<()>;
3921
3922    /// A helper function for fused adapter modules involving calls where the
3923    /// caller is sync-lowered but the callee is async-lifted.
3924    unsafe fn sync_start(
3925        &mut self,
3926        instance: Instance,
3927        callback: *mut VMFuncRef,
3928        callee: NonNull<VMFuncRef>,
3929        param_count: u32,
3930        storage: *mut MaybeUninit<ValRaw>,
3931        storage_len: usize,
3932    ) -> Result<()>;
3933
3934    /// A helper function for fused adapter modules involving calls where the
3935    /// caller is async-lowered.
3936    unsafe fn async_start(
3937        &mut self,
3938        instance: Instance,
3939        callback: *mut VMFuncRef,
3940        post_return: *mut VMFuncRef,
3941        callee: NonNull<VMFuncRef>,
3942        param_count: u32,
3943        result_count: u32,
3944        flags: u32,
3945    ) -> Result<u32>;
3946
3947    /// The `future.write` intrinsic.
3948    fn future_write(
3949        &mut self,
3950        instance: Instance,
3951        caller: RuntimeComponentInstanceIndex,
3952        ty: TypeFutureTableIndex,
3953        options: OptionsIndex,
3954        future: u32,
3955        address: u32,
3956    ) -> Result<u32>;
3957
3958    /// The `future.read` intrinsic.
3959    fn future_read(
3960        &mut self,
3961        instance: Instance,
3962        caller: RuntimeComponentInstanceIndex,
3963        ty: TypeFutureTableIndex,
3964        options: OptionsIndex,
3965        future: u32,
3966        address: u32,
3967    ) -> Result<u32>;
3968
3969    /// The `future.drop-writable` intrinsic.
3970    fn future_drop_writable(
3971        &mut self,
3972        instance: Instance,
3973        ty: TypeFutureTableIndex,
3974        writer: u32,
3975    ) -> Result<()>;
3976
3977    /// The `stream.write` intrinsic.
3978    fn stream_write(
3979        &mut self,
3980        instance: Instance,
3981        caller: RuntimeComponentInstanceIndex,
3982        ty: TypeStreamTableIndex,
3983        options: OptionsIndex,
3984        stream: u32,
3985        address: u32,
3986        count: u32,
3987    ) -> Result<u32>;
3988
3989    /// The `stream.read` intrinsic.
3990    fn stream_read(
3991        &mut self,
3992        instance: Instance,
3993        caller: RuntimeComponentInstanceIndex,
3994        ty: TypeStreamTableIndex,
3995        options: OptionsIndex,
3996        stream: u32,
3997        address: u32,
3998        count: u32,
3999    ) -> Result<u32>;
4000
4001    /// The "fast-path" implementation of the `stream.write` intrinsic for
4002    /// "flat" (i.e. memcpy-able) payloads.
4003    fn flat_stream_write(
4004        &mut self,
4005        instance: Instance,
4006        caller: RuntimeComponentInstanceIndex,
4007        ty: TypeStreamTableIndex,
4008        options: OptionsIndex,
4009        payload_size: u32,
4010        payload_align: u32,
4011        stream: u32,
4012        address: u32,
4013        count: u32,
4014    ) -> Result<u32>;
4015
4016    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4017    /// (i.e. memcpy-able) payloads.
4018    fn flat_stream_read(
4019        &mut self,
4020        instance: Instance,
4021        caller: RuntimeComponentInstanceIndex,
4022        ty: TypeStreamTableIndex,
4023        options: OptionsIndex,
4024        payload_size: u32,
4025        payload_align: u32,
4026        stream: u32,
4027        address: u32,
4028        count: u32,
4029    ) -> Result<u32>;
4030
4031    /// The `stream.drop-writable` intrinsic.
4032    fn stream_drop_writable(
4033        &mut self,
4034        instance: Instance,
4035        ty: TypeStreamTableIndex,
4036        writer: u32,
4037    ) -> Result<()>;
4038
4039    /// The `error-context.debug-message` intrinsic.
4040    fn error_context_debug_message(
4041        &mut self,
4042        instance: Instance,
4043        ty: TypeComponentLocalErrorContextTableIndex,
4044        options: OptionsIndex,
4045        err_ctx_handle: u32,
4046        debug_msg_address: u32,
4047    ) -> Result<()>;
4048
4049    /// The `thread.new-indirect` intrinsic
4050    fn thread_new_indirect(
4051        &mut self,
4052        instance: Instance,
4053        caller: RuntimeComponentInstanceIndex,
4054        func_ty_idx: TypeFuncIndex,
4055        start_func_table_idx: RuntimeTableIndex,
4056        start_func_idx: u32,
4057        context: i32,
4058    ) -> Result<u32>;
4059}
4060
4061/// SAFETY: See trait docs.
4062impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4063    unsafe fn prepare_call(
4064        &mut self,
4065        instance: Instance,
4066        memory: *mut VMMemoryDefinition,
4067        start: NonNull<VMFuncRef>,
4068        return_: NonNull<VMFuncRef>,
4069        caller_instance: RuntimeComponentInstanceIndex,
4070        callee_instance: RuntimeComponentInstanceIndex,
4071        task_return_type: TypeTupleIndex,
4072        callee_async: bool,
4073        string_encoding: StringEncoding,
4074        result_count_or_max_if_async: u32,
4075        storage: *mut ValRaw,
4076        storage_len: usize,
4077    ) -> Result<()> {
4078        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4079        // this method will have ensured that `storage` is a valid
4080        // pointer containing at least `storage_len` items.
4081        let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4082
4083        unsafe {
4084            instance.prepare_call(
4085                StoreContextMut(self),
4086                start,
4087                return_,
4088                caller_instance,
4089                callee_instance,
4090                task_return_type,
4091                callee_async,
4092                memory,
4093                string_encoding,
4094                match result_count_or_max_if_async {
4095                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4096                        params,
4097                        has_result: false,
4098                    },
4099                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4100                        params,
4101                        has_result: true,
4102                    },
4103                    result_count => CallerInfo::Sync {
4104                        params,
4105                        result_count,
4106                    },
4107                },
4108            )
4109        }
4110    }
4111
4112    unsafe fn sync_start(
4113        &mut self,
4114        instance: Instance,
4115        callback: *mut VMFuncRef,
4116        callee: NonNull<VMFuncRef>,
4117        param_count: u32,
4118        storage: *mut MaybeUninit<ValRaw>,
4119        storage_len: usize,
4120    ) -> Result<()> {
4121        unsafe {
4122            instance
4123                .start_call(
4124                    StoreContextMut(self),
4125                    callback,
4126                    ptr::null_mut(),
4127                    callee,
4128                    param_count,
4129                    1,
4130                    START_FLAG_ASYNC_CALLEE,
4131                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4132                    // this method will have ensured that `storage` is a valid
4133                    // pointer containing at least `storage_len` items.
4134                    Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4135                )
4136                .map(drop)
4137        }
4138    }
4139
4140    unsafe fn async_start(
4141        &mut self,
4142        instance: Instance,
4143        callback: *mut VMFuncRef,
4144        post_return: *mut VMFuncRef,
4145        callee: NonNull<VMFuncRef>,
4146        param_count: u32,
4147        result_count: u32,
4148        flags: u32,
4149    ) -> Result<u32> {
4150        unsafe {
4151            instance.start_call(
4152                StoreContextMut(self),
4153                callback,
4154                post_return,
4155                callee,
4156                param_count,
4157                result_count,
4158                flags,
4159                None,
4160            )
4161        }
4162    }
4163
4164    fn future_write(
4165        &mut self,
4166        instance: Instance,
4167        caller: RuntimeComponentInstanceIndex,
4168        ty: TypeFutureTableIndex,
4169        options: OptionsIndex,
4170        future: u32,
4171        address: u32,
4172    ) -> Result<u32> {
4173        instance
4174            .guest_write(
4175                StoreContextMut(self),
4176                caller,
4177                TransmitIndex::Future(ty),
4178                options,
4179                None,
4180                future,
4181                address,
4182                1,
4183            )
4184            .map(|result| result.encode())
4185    }
4186
4187    fn future_read(
4188        &mut self,
4189        instance: Instance,
4190        caller: RuntimeComponentInstanceIndex,
4191        ty: TypeFutureTableIndex,
4192        options: OptionsIndex,
4193        future: u32,
4194        address: u32,
4195    ) -> Result<u32> {
4196        instance
4197            .guest_read(
4198                StoreContextMut(self),
4199                caller,
4200                TransmitIndex::Future(ty),
4201                options,
4202                None,
4203                future,
4204                address,
4205                1,
4206            )
4207            .map(|result| result.encode())
4208    }
4209
4210    fn stream_write(
4211        &mut self,
4212        instance: Instance,
4213        caller: RuntimeComponentInstanceIndex,
4214        ty: TypeStreamTableIndex,
4215        options: OptionsIndex,
4216        stream: u32,
4217        address: u32,
4218        count: u32,
4219    ) -> Result<u32> {
4220        instance
4221            .guest_write(
4222                StoreContextMut(self),
4223                caller,
4224                TransmitIndex::Stream(ty),
4225                options,
4226                None,
4227                stream,
4228                address,
4229                count,
4230            )
4231            .map(|result| result.encode())
4232    }
4233
4234    fn stream_read(
4235        &mut self,
4236        instance: Instance,
4237        caller: RuntimeComponentInstanceIndex,
4238        ty: TypeStreamTableIndex,
4239        options: OptionsIndex,
4240        stream: u32,
4241        address: u32,
4242        count: u32,
4243    ) -> Result<u32> {
4244        instance
4245            .guest_read(
4246                StoreContextMut(self),
4247                caller,
4248                TransmitIndex::Stream(ty),
4249                options,
4250                None,
4251                stream,
4252                address,
4253                count,
4254            )
4255            .map(|result| result.encode())
4256    }
4257
4258    fn future_drop_writable(
4259        &mut self,
4260        instance: Instance,
4261        ty: TypeFutureTableIndex,
4262        writer: u32,
4263    ) -> Result<()> {
4264        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4265    }
4266
4267    fn flat_stream_write(
4268        &mut self,
4269        instance: Instance,
4270        caller: RuntimeComponentInstanceIndex,
4271        ty: TypeStreamTableIndex,
4272        options: OptionsIndex,
4273        payload_size: u32,
4274        payload_align: u32,
4275        stream: u32,
4276        address: u32,
4277        count: u32,
4278    ) -> Result<u32> {
4279        instance
4280            .guest_write(
4281                StoreContextMut(self),
4282                caller,
4283                TransmitIndex::Stream(ty),
4284                options,
4285                Some(FlatAbi {
4286                    size: payload_size,
4287                    align: payload_align,
4288                }),
4289                stream,
4290                address,
4291                count,
4292            )
4293            .map(|result| result.encode())
4294    }
4295
4296    fn flat_stream_read(
4297        &mut self,
4298        instance: Instance,
4299        caller: RuntimeComponentInstanceIndex,
4300        ty: TypeStreamTableIndex,
4301        options: OptionsIndex,
4302        payload_size: u32,
4303        payload_align: u32,
4304        stream: u32,
4305        address: u32,
4306        count: u32,
4307    ) -> Result<u32> {
4308        instance
4309            .guest_read(
4310                StoreContextMut(self),
4311                caller,
4312                TransmitIndex::Stream(ty),
4313                options,
4314                Some(FlatAbi {
4315                    size: payload_size,
4316                    align: payload_align,
4317                }),
4318                stream,
4319                address,
4320                count,
4321            )
4322            .map(|result| result.encode())
4323    }
4324
4325    fn stream_drop_writable(
4326        &mut self,
4327        instance: Instance,
4328        ty: TypeStreamTableIndex,
4329        writer: u32,
4330    ) -> Result<()> {
4331        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4332    }
4333
4334    fn error_context_debug_message(
4335        &mut self,
4336        instance: Instance,
4337        ty: TypeComponentLocalErrorContextTableIndex,
4338        options: OptionsIndex,
4339        err_ctx_handle: u32,
4340        debug_msg_address: u32,
4341    ) -> Result<()> {
4342        instance.error_context_debug_message(
4343            StoreContextMut(self),
4344            ty,
4345            options,
4346            err_ctx_handle,
4347            debug_msg_address,
4348        )
4349    }
4350
4351    fn thread_new_indirect(
4352        &mut self,
4353        instance: Instance,
4354        caller: RuntimeComponentInstanceIndex,
4355        func_ty_idx: TypeFuncIndex,
4356        start_func_table_idx: RuntimeTableIndex,
4357        start_func_idx: u32,
4358        context: i32,
4359    ) -> Result<u32> {
4360        instance.thread_new_indirect(
4361            StoreContextMut(self),
4362            caller,
4363            func_ty_idx,
4364            start_func_table_idx,
4365            start_func_idx,
4366            context,
4367        )
4368    }
4369}
4370
4371type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4372
4373/// Represents the state of a pending host task.
4374///
4375/// This is used to represent tasks when the guest calls into the host.
4376pub(crate) struct HostTask {
4377    common: WaitableCommon,
4378
4379    /// Guest thread which called the host.
4380    caller: QualifiedThreadId,
4381
4382    /// State of borrows/etc the host needs to track. Used when the guest passes
4383    /// borrows to the host, for example.
4384    call_context: CallContext,
4385
4386    state: HostTaskState,
4387}
4388
4389enum HostTaskState {
4390    /// A host task has been created and it's considered "started".
4391    ///
4392    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4393    /// is where this will get updated further.
4394    CalleeStarted,
4395
4396    /// State used for tasks in `first_poll` meaning that the guest did an async
4397    /// lower of a host async function which is blocked. The specified handle is
4398    /// linked to the future in the main `FuturesUnordered` of a store which is
4399    /// used to cancel it if the guest requests cancellation.
4400    CalleeRunning(JoinHandle),
4401
4402    /// Terminal state used for tasks in `poll_and_block` to store the result of
4403    /// their computation. Note that this state is not used for tasks in
4404    /// `first_poll`.
4405    CalleeFinished(LiftedResult),
4406
4407    /// Terminal state for host tasks meaning that the task was cancelled or the
4408    /// result was taken.
4409    CalleeDone { cancelled: bool },
4410}
4411
4412impl HostTask {
4413    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4414        Self {
4415            common: WaitableCommon::default(),
4416            call_context: CallContext::default(),
4417            caller,
4418            state,
4419        }
4420    }
4421}
4422
4423impl TableDebug for HostTask {
4424    fn type_name() -> &'static str {
4425        "HostTask"
4426    }
4427}
4428
4429type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4430
4431/// Represents the caller of a given guest task.
4432enum Caller {
4433    /// The host called the guest task.
4434    Host {
4435        /// If present, may be used to deliver the result.
4436        tx: Option<oneshot::Sender<LiftedResult>>,
4437        /// If true, there's a host future that must be dropped before the task
4438        /// can be deleted.
4439        host_future_present: bool,
4440        /// Represents the caller of the host function which called back into a
4441        /// guest. Note that this thread could belong to an entirely unrelated
4442        /// top-level component instance than the one the host called into.
4443        caller: CurrentThread,
4444    },
4445    /// Another guest thread called the guest task
4446    Guest {
4447        /// The id of the caller
4448        thread: QualifiedThreadId,
4449    },
4450}
4451
4452/// Represents a closure and related canonical ABI parameters required to
4453/// validate a `task.return` call at runtime and lift the result.
4454struct LiftResult {
4455    lift: RawLift,
4456    ty: TypeTupleIndex,
4457    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4458    string_encoding: StringEncoding,
4459}
4460
4461/// The table ID for a guest thread, qualified by the task to which it belongs.
4462///
4463/// This exists to minimize table lookups and the necessity to pass stores around mutably
4464/// for the common case of identifying the task to which a thread belongs.
4465#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4466pub(crate) struct QualifiedThreadId {
4467    task: TableId<GuestTask>,
4468    thread: TableId<GuestThread>,
4469}
4470
4471impl QualifiedThreadId {
4472    fn qualify(
4473        state: &mut ConcurrentState,
4474        thread: TableId<GuestThread>,
4475    ) -> Result<QualifiedThreadId> {
4476        Ok(QualifiedThreadId {
4477            task: state.get_mut(thread)?.parent_task,
4478            thread,
4479        })
4480    }
4481}
4482
4483impl fmt::Debug for QualifiedThreadId {
4484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4485        f.debug_tuple("QualifiedThreadId")
4486            .field(&self.task.rep())
4487            .field(&self.thread.rep())
4488            .finish()
4489    }
4490}
4491
4492enum GuestThreadState {
4493    NotStartedImplicit,
4494    NotStartedExplicit(
4495        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4496    ),
4497    Running,
4498    Suspended(StoreFiber<'static>),
4499    Ready {
4500        fiber: StoreFiber<'static>,
4501        cancellable: bool,
4502    },
4503    Completed,
4504}
4505pub struct GuestThread {
4506    /// Context-local state used to implement the `context.{get,set}`
4507    /// intrinsics.
4508    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4509    /// The owning guest task.
4510    parent_task: TableId<GuestTask>,
4511    /// If present, indicates that the thread is currently waiting on the
4512    /// specified set but may be cancelled and woken immediately.
4513    wake_on_cancel: Option<TableId<WaitableSet>>,
4514    /// The execution state of this guest thread
4515    state: GuestThreadState,
4516    /// The index of this thread in the component instance's handle table.
4517    /// This must always be `Some` after initialization.
4518    instance_rep: Option<u32>,
4519    /// Scratch waitable set used to watch subtasks during synchronous calls.
4520    sync_call_set: TableId<WaitableSet>,
4521}
4522
4523impl GuestThread {
4524    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4525    /// handle.
4526    fn from_instance(
4527        state: Pin<&mut ComponentInstance>,
4528        caller_instance: RuntimeComponentInstanceIndex,
4529        guest_thread: u32,
4530    ) -> Result<TableId<Self>> {
4531        let rep = state.instance_states().0[caller_instance]
4532            .thread_handle_table()
4533            .guest_thread_rep(guest_thread)?;
4534        Ok(TableId::new(rep))
4535    }
4536
4537    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4538        let sync_call_set = state.push(WaitableSet {
4539            is_sync_call_set: true,
4540            ..WaitableSet::default()
4541        })?;
4542        Ok(Self {
4543            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4544            parent_task,
4545            wake_on_cancel: None,
4546            state: GuestThreadState::NotStartedImplicit,
4547            instance_rep: None,
4548            sync_call_set,
4549        })
4550    }
4551
4552    fn new_explicit(
4553        state: &mut ConcurrentState,
4554        parent_task: TableId<GuestTask>,
4555        start_func: Box<
4556            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4557        >,
4558    ) -> Result<Self> {
4559        let sync_call_set = state.push(WaitableSet {
4560            is_sync_call_set: true,
4561            ..WaitableSet::default()
4562        })?;
4563        Ok(Self {
4564            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4565            parent_task,
4566            wake_on_cancel: None,
4567            state: GuestThreadState::NotStartedExplicit(start_func),
4568            instance_rep: None,
4569            sync_call_set,
4570        })
4571    }
4572}
4573
4574impl TableDebug for GuestThread {
4575    fn type_name() -> &'static str {
4576        "GuestThread"
4577    }
4578}
4579
4580enum SyncResult {
4581    NotProduced,
4582    Produced(Option<ValRaw>),
4583    Taken,
4584}
4585
4586impl SyncResult {
4587    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4588        Ok(match mem::replace(self, SyncResult::Taken) {
4589            SyncResult::NotProduced => None,
4590            SyncResult::Produced(val) => Some(val),
4591            SyncResult::Taken => {
4592                bail_bug!("attempted to take a synchronous result that was already taken")
4593            }
4594        })
4595    }
4596}
4597
4598#[derive(Debug)]
4599enum HostFutureState {
4600    NotApplicable,
4601    Live,
4602    Dropped,
4603}
4604
4605/// Represents a pending guest task.
4606pub(crate) struct GuestTask {
4607    /// See `WaitableCommon`
4608    common: WaitableCommon,
4609    /// Closure to lower the parameters passed to this task.
4610    lower_params: Option<RawLower>,
4611    /// See `LiftResult`
4612    lift_result: Option<LiftResult>,
4613    /// A place to stash the type-erased lifted result if it can't be delivered
4614    /// immediately.
4615    result: Option<LiftedResult>,
4616    /// Closure to call the callback function for an async-lifted export, if
4617    /// provided.
4618    callback: Option<CallbackFn>,
4619    /// See `Caller`
4620    caller: Caller,
4621    /// Borrow state for this task.
4622    ///
4623    /// Keeps track of `borrow<T>` received to this task to ensure that
4624    /// everything is dropped by the time it exits.
4625    call_context: CallContext,
4626    /// A place to stash the lowered result for a sync-to-async call until it
4627    /// can be returned to the caller.
4628    sync_result: SyncResult,
4629    /// Whether or not the task has been cancelled (i.e. whether the task is
4630    /// permitted to call `task.cancel`).
4631    cancel_sent: bool,
4632    /// Whether or not we've sent a `Status::Starting` event to any current or
4633    /// future waiters for this waitable.
4634    starting_sent: bool,
4635    /// The runtime instance to which the exported function for this guest task
4636    /// belongs.
4637    ///
4638    /// Note that the task may do a sync->sync call via a fused adapter which
4639    /// results in that task executing code in a different instance, and it may
4640    /// call host functions and intrinsics from that other instance.
4641    instance: RuntimeInstance,
4642    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4643    /// delivered to this task.
4644    event: Option<Event>,
4645    /// Whether or not the task has exited.
4646    exited: bool,
4647    /// Threads belonging to this task
4648    threads: HashSet<TableId<GuestThread>>,
4649    /// The state of the host future that represents an async task, which must
4650    /// be dropped before we can delete the task.
4651    host_future_state: HostFutureState,
4652    /// Indicates whether this task was created for a call to an async-lifted
4653    /// export.
4654    async_function: bool,
4655
4656    decremented_interesting_task_count: bool,
4657}
4658
4659impl GuestTask {
4660    fn already_lowered_parameters(&self) -> bool {
4661        // We reset `lower_params` after we lower the parameters
4662        self.lower_params.is_none()
4663    }
4664
4665    fn returned_or_cancelled(&self) -> bool {
4666        // We reset `lift_result` after we return or exit
4667        self.lift_result.is_none()
4668    }
4669
4670    fn ready_to_delete(&self) -> bool {
4671        let threads_completed = self.threads.is_empty();
4672        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4673        let pending_completion_event = matches!(
4674            self.common.event,
4675            Some(Event::Subtask {
4676                status: Status::Returned | Status::ReturnCancelled
4677            })
4678        );
4679        let ready = threads_completed
4680            && !has_sync_result
4681            && !pending_completion_event
4682            && !matches!(self.host_future_state, HostFutureState::Live);
4683        log::trace!(
4684            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4685            threads_completed,
4686            has_sync_result,
4687            pending_completion_event,
4688            self.host_future_state
4689        );
4690        ready
4691    }
4692
4693    fn new(
4694        state: &mut ConcurrentState,
4695        lower_params: RawLower,
4696        lift_result: LiftResult,
4697        caller: Caller,
4698        callback: Option<CallbackFn>,
4699        instance: RuntimeInstance,
4700        async_function: bool,
4701    ) -> Result<QualifiedThreadId> {
4702        let host_future_state = match &caller {
4703            Caller::Guest { .. } => HostFutureState::NotApplicable,
4704            Caller::Host {
4705                host_future_present,
4706                ..
4707            } => {
4708                if *host_future_present {
4709                    HostFutureState::Live
4710                } else {
4711                    HostFutureState::NotApplicable
4712                }
4713            }
4714        };
4715        let task = state.push(Self {
4716            common: WaitableCommon::default(),
4717            lower_params: Some(lower_params),
4718            lift_result: Some(lift_result),
4719            result: None,
4720            callback,
4721            caller,
4722            call_context: CallContext::default(),
4723            sync_result: SyncResult::NotProduced,
4724            cancel_sent: false,
4725            starting_sent: false,
4726            instance,
4727            event: None,
4728            exited: false,
4729            threads: HashSet::new(),
4730            host_future_state,
4731            async_function,
4732            decremented_interesting_task_count: false,
4733        })?;
4734        let new_thread = GuestThread::new_implicit(state, task)?;
4735        let thread = state.push(new_thread)?;
4736        state.get_mut(task)?.threads.insert(thread);
4737        state.interesting_tasks += 1;
4738        Ok(QualifiedThreadId { task, thread })
4739    }
4740}
4741
4742impl TableDebug for GuestTask {
4743    fn type_name() -> &'static str {
4744        "GuestTask"
4745    }
4746}
4747
4748/// Represents state common to all kinds of waitables.
4749#[derive(Default)]
4750struct WaitableCommon {
4751    /// The currently pending event for this waitable, if any.
4752    event: Option<Event>,
4753    /// The set to which this waitable belongs, if any.
4754    set: Option<TableId<WaitableSet>>,
4755    /// The handle with which the guest refers to this waitable, if any.
4756    handle: Option<u32>,
4757}
4758
4759/// Represents a Component Model Async `waitable`.
4760#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4761enum Waitable {
4762    /// A host task
4763    Host(TableId<HostTask>),
4764    /// A guest task
4765    Guest(TableId<GuestTask>),
4766    /// The read or write end of a stream or future
4767    Transmit(TableId<TransmitHandle>),
4768}
4769
4770impl Waitable {
4771    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4772    /// handle.
4773    fn from_instance(
4774        state: Pin<&mut ComponentInstance>,
4775        caller_instance: RuntimeComponentInstanceIndex,
4776        waitable: u32,
4777    ) -> Result<Self> {
4778        use crate::runtime::vm::component::Waitable;
4779
4780        let (waitable, kind) = state.instance_states().0[caller_instance]
4781            .handle_table()
4782            .waitable_rep(waitable)?;
4783
4784        Ok(match kind {
4785            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4786            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4787            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4788        })
4789    }
4790
4791    /// Retrieve the host-visible identifier for this `Waitable`.
4792    fn rep(&self) -> u32 {
4793        match self {
4794            Self::Host(id) => id.rep(),
4795            Self::Guest(id) => id.rep(),
4796            Self::Transmit(id) => id.rep(),
4797        }
4798    }
4799
4800    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4801    /// remove it from any set it may currently belong to (when `set` is
4802    /// `None`).
4803    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4804        log::trace!("waitable {self:?} join set {set:?}");
4805
4806        let old = mem::replace(&mut self.common(state)?.set, set);
4807
4808        if let Some(old) = old {
4809            match *self {
4810                Waitable::Host(id) => state.remove_child(id, old),
4811                Waitable::Guest(id) => state.remove_child(id, old),
4812                Waitable::Transmit(id) => state.remove_child(id, old),
4813            }?;
4814
4815            state.get_mut(old)?.ready.remove(self);
4816        }
4817
4818        if let Some(set) = set {
4819            match *self {
4820                Waitable::Host(id) => state.add_child(id, set),
4821                Waitable::Guest(id) => state.add_child(id, set),
4822                Waitable::Transmit(id) => state.add_child(id, set),
4823            }?;
4824
4825            if self.common(state)?.event.is_some() {
4826                self.mark_ready(state)?;
4827            }
4828        }
4829
4830        Ok(())
4831    }
4832
4833    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4834    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4835        Ok(match self {
4836            Self::Host(id) => &mut state.get_mut(*id)?.common,
4837            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4838            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4839        })
4840    }
4841
4842    /// Set or clear the pending event for this waitable and either deliver it
4843    /// to the first waiter, if any, or mark it as ready to be delivered to the
4844    /// next waiter that arrives.
4845    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4846        log::trace!("set event for {self:?}: {event:?}");
4847        self.common(state)?.event = event;
4848        self.mark_ready(state)
4849    }
4850
4851    /// Take the pending event from this waitable, leaving `None` in its place.
4852    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4853        let common = self.common(state)?;
4854        let event = common.event.take();
4855        if let Some(set) = self.common(state)?.set {
4856            state.get_mut(set)?.ready.remove(self);
4857        }
4858
4859        Ok(event)
4860    }
4861
4862    /// Deliver the current event for this waitable to the first waiter, if any,
4863    /// or else mark it as ready to be delivered to the next waiter that
4864    /// arrives.
4865    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4866        if let Some(set) = self.common(state)?.set {
4867            state.get_mut(set)?.ready.insert(*self);
4868            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4869                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4870                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4871
4872                let item = match mode {
4873                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4874                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4875                        state.get_mut(thread.task)?.instance.index,
4876                        GuestCall {
4877                            thread,
4878                            kind: GuestCallKind::DeliverEvent {
4879                                instance,
4880                                set: Some(set),
4881                            },
4882                        },
4883                    ),
4884                };
4885                state.push_high_priority(item);
4886            }
4887        }
4888        Ok(())
4889    }
4890
4891    /// Remove this waitable from the instance's rep table.
4892    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4893        match self {
4894            Self::Host(task) => {
4895                log::trace!("delete host task {task:?}");
4896                state.delete(*task)?;
4897            }
4898            Self::Guest(task) => {
4899                log::trace!("delete guest task {task:?}");
4900                let task = state.delete(*task)?;
4901
4902                // When a guest task is created it increments the
4903                // `ConcurrentState::interesting_tasks` counter, and that needs
4904                // to be paired with a decrement. There are a few situations in
4905                // which the decrement needs to happen which don't all funnel
4906                // through here, so in lieu of that at least try to catch issues
4907                // where we forgot to do a decrement.
4908                debug_assert!(task.decremented_interesting_task_count);
4909            }
4910            Self::Transmit(task) => {
4911                state.delete(*task)?;
4912            }
4913        }
4914
4915        Ok(())
4916    }
4917}
4918
4919impl fmt::Debug for Waitable {
4920    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4921        match self {
4922            Self::Host(id) => write!(f, "{id:?}"),
4923            Self::Guest(id) => write!(f, "{id:?}"),
4924            Self::Transmit(id) => write!(f, "{id:?}"),
4925        }
4926    }
4927}
4928
4929/// Represents a Component Model Async `waitable-set`.
4930#[derive(Default)]
4931struct WaitableSet {
4932    /// Which waitables in this set have pending events, if any.
4933    ready: BTreeSet<Waitable>,
4934    /// Which guest threads are currently waiting on this set, if any.
4935    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4936    /// Whether this set is a synthetic, internal one meant for handling
4937    /// synchronous calls.
4938    is_sync_call_set: bool,
4939}
4940
4941impl TableDebug for WaitableSet {
4942    fn type_name() -> &'static str {
4943        "WaitableSet"
4944    }
4945}
4946
4947/// Type-erased closure to lower the parameters for a guest task.
4948type RawLower =
4949    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4950
4951/// Type-erased closure to lift the result for a guest task.
4952type RawLift = Box<
4953    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4954>;
4955
4956/// Type erased result of a guest task which may be downcast to the expected
4957/// type by a host caller (or simply ignored in the case of a guest caller; see
4958/// `DummyResult`).
4959type LiftedResult = Box<dyn Any + Send + Sync>;
4960
4961/// Used to return a result from a `LiftFn` when the actual result has already
4962/// been lowered to a guest task's stack and linear memory.
4963struct DummyResult;
4964
4965/// Represents the Component Model Async state of a (sub-)component instance.
4966#[derive(Default)]
4967pub struct ConcurrentInstanceState {
4968    /// Whether backpressure is set for this instance (enabled if >0)
4969    backpressure: u16,
4970    /// Whether this instance can be entered
4971    do_not_enter: bool,
4972    /// Pending calls for this instance which require `Self::backpressure` to be
4973    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4974    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4975}
4976
4977impl ConcurrentInstanceState {
4978    pub fn pending_is_empty(&self) -> bool {
4979        self.pending.is_empty()
4980    }
4981}
4982
4983#[derive(Debug, Copy, Clone)]
4984pub(crate) enum CurrentThread {
4985    Guest(QualifiedThreadId),
4986    Host(TableId<HostTask>),
4987    None,
4988}
4989
4990impl CurrentThread {
4991    fn guest(&self) -> Option<&QualifiedThreadId> {
4992        match self {
4993            Self::Guest(id) => Some(id),
4994            _ => None,
4995        }
4996    }
4997
4998    fn host(&self) -> Option<TableId<HostTask>> {
4999        match self {
5000            Self::Host(id) => Some(*id),
5001            _ => None,
5002        }
5003    }
5004
5005    fn is_none(&self) -> bool {
5006        matches!(self, Self::None)
5007    }
5008}
5009
5010impl From<QualifiedThreadId> for CurrentThread {
5011    fn from(id: QualifiedThreadId) -> Self {
5012        Self::Guest(id)
5013    }
5014}
5015
5016impl From<TableId<HostTask>> for CurrentThread {
5017    fn from(id: TableId<HostTask>) -> Self {
5018        Self::Host(id)
5019    }
5020}
5021
5022/// Represents the Component Model Async state of a store.
5023pub struct ConcurrentState {
5024    /// The currently running thread, if any.
5025    current_thread: CurrentThread,
5026
5027    /// The set of pending host and background tasks, if any.
5028    ///
5029    /// See `ComponentInstance::poll_until` for where we temporarily take this
5030    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5031    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5032    /// The table of waitables, waitable sets, etc.
5033    table: AlwaysMut<ResourceTable>,
5034    /// The "high priority" work queue for this store's event loop.
5035    high_priority: Vec<WorkItem>,
5036    /// The "low priority" work queue for this store's event loop.
5037    low_priority: VecDeque<WorkItem>,
5038    /// A place to stash the reason a fiber is suspending so that the code which
5039    /// resumed it will know under what conditions the fiber should be resumed
5040    /// again.
5041    suspend_reason: Option<SuspendReason>,
5042    /// A cached fiber which is waiting for work to do.
5043    ///
5044    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5045    worker: Option<StoreFiber<'static>>,
5046    /// A place to stash the work item for which we're resuming a worker fiber.
5047    worker_item: Option<WorkerItem>,
5048
5049    /// Reference counts for all component error contexts
5050    ///
5051    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5052    /// (sub)component ref counts as tracked by `error_context_tables`, for
5053    /// example when the host holds one or more references to error contexts.
5054    ///
5055    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5056    /// component-wide representation) of the index into concurrent state for a given
5057    /// stored `ErrorContext`.
5058    ///
5059    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5060    /// as a `TableId<ErrorContextState>`.
5061    global_error_context_ref_counts:
5062        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5063
5064    /// The number of "interesting tasks" currently executing in the store.
5065    ///
5066    /// This tracks the concept of a component instance lifetime as defined in
5067    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5068    /// all tasks currently increment this counter which then gets decremented
5069    /// when they exit. In the future some tasks might not increment this
5070    /// counter, but for now all do.
5071    ///
5072    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5073    /// inform the embedder when all tasks have completed. This is then
5074    /// used in wasmtime-wasi-http, for example, to know when an instance is
5075    /// idle.
5076    interesting_tasks: usize,
5077
5078    /// Single waker to notify when `interesting_tasks` reaches 0.
5079    ///
5080    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5081    interesting_tasks_empty_waker: Option<Waker>,
5082}
5083
5084impl Default for ConcurrentState {
5085    fn default() -> Self {
5086        Self {
5087            current_thread: CurrentThread::None,
5088            table: AlwaysMut::new(ResourceTable::new()),
5089            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5090            high_priority: Vec::new(),
5091            low_priority: VecDeque::new(),
5092            suspend_reason: None,
5093            worker: None,
5094            worker_item: None,
5095            global_error_context_ref_counts: BTreeMap::new(),
5096            interesting_tasks: 0,
5097            interesting_tasks_empty_waker: None,
5098        }
5099    }
5100}
5101
5102impl ConcurrentState {
5103    /// Take ownership of any fibers and futures owned by this object.
5104    ///
5105    /// This should be used when disposing of the `Store` containing this object
5106    /// in order to gracefully resolve any and all fibers using
5107    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5108    /// use-after-free bugs due to fibers which may still have access to the
5109    /// `Store`.
5110    ///
5111    /// Additionally, the futures collected with this function should be dropped
5112    /// within a `tls::set` call, which will ensure than any futures closing
5113    /// over an `&Accessor` will have access to the store when dropped, allowing
5114    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5115    /// panicking.
5116    ///
5117    /// Note that this will leave the object in an inconsistent and unusable
5118    /// state, so it should only be used just prior to dropping it.
5119    pub(crate) fn take_fibers_and_futures(
5120        &mut self,
5121        fibers: &mut Vec<StoreFiber<'static>>,
5122        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5123    ) {
5124        for entry in self.table.get_mut().iter_mut() {
5125            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5126                for mode in mem::take(&mut set.waiting).into_values() {
5127                    if let WaitMode::Fiber(fiber) = mode {
5128                        fibers.push(fiber);
5129                    }
5130                }
5131            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5132                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5133                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5134                {
5135                    fibers.push(fiber);
5136                }
5137            }
5138        }
5139
5140        if let Some(fiber) = self.worker.take() {
5141            fibers.push(fiber);
5142        }
5143
5144        let mut handle_item = |item| match item {
5145            WorkItem::ResumeFiber(fiber) => {
5146                fibers.push(fiber);
5147            }
5148            WorkItem::PushFuture(future) => {
5149                self.futures
5150                    .get_mut()
5151                    .as_mut()
5152                    .unwrap()
5153                    .push(future.into_inner());
5154            }
5155            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5156            }
5157        };
5158
5159        for item in mem::take(&mut self.high_priority) {
5160            handle_item(item);
5161        }
5162        for item in mem::take(&mut self.low_priority) {
5163            handle_item(item);
5164        }
5165
5166        if let Some(them) = self.futures.get_mut().take() {
5167            futures.push(them);
5168        }
5169    }
5170
5171    fn push<V: Send + Sync + 'static>(
5172        &mut self,
5173        value: V,
5174    ) -> Result<TableId<V>, ResourceTableError> {
5175        self.table.get_mut().push(value).map(TableId::from)
5176    }
5177
5178    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5179        self.table.get_mut().get_mut(&Resource::from(id))
5180    }
5181
5182    pub fn add_child<T: 'static, U: 'static>(
5183        &mut self,
5184        child: TableId<T>,
5185        parent: TableId<U>,
5186    ) -> Result<(), ResourceTableError> {
5187        self.table
5188            .get_mut()
5189            .add_child(Resource::from(child), Resource::from(parent))
5190    }
5191
5192    pub fn remove_child<T: 'static, U: 'static>(
5193        &mut self,
5194        child: TableId<T>,
5195        parent: TableId<U>,
5196    ) -> Result<(), ResourceTableError> {
5197        self.table
5198            .get_mut()
5199            .remove_child(Resource::from(child), Resource::from(parent))
5200    }
5201
5202    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5203        self.table.get_mut().delete(Resource::from(id))
5204    }
5205
5206    fn push_future(&mut self, future: HostTaskFuture) {
5207        // Note that we can't directly push to `ConcurrentState::futures` here
5208        // since this may be called from a future that's being polled inside
5209        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5210        // so it has exclusive access while polling it.  Therefore, we push a
5211        // work item to the "high priority" queue, which will actually push to
5212        // `ConcurrentState::futures` later.
5213        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5214    }
5215
5216    fn push_high_priority(&mut self, item: WorkItem) {
5217        log::trace!("push high priority: {item:?}");
5218        self.high_priority.push(item);
5219    }
5220
5221    fn push_low_priority(&mut self, item: WorkItem) {
5222        log::trace!("push low priority: {item:?}");
5223        self.low_priority.push_front(item);
5224    }
5225
5226    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5227        if high_priority {
5228            self.push_high_priority(item);
5229        } else {
5230            self.push_low_priority(item);
5231        }
5232    }
5233
5234    fn promote_instance_local_thread_work_item(
5235        &mut self,
5236        current_instance: RuntimeComponentInstanceIndex,
5237    ) -> bool {
5238        self.promote_work_items_matching(|item: &WorkItem| match item {
5239            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5240                *instance == current_instance
5241            }
5242            _ => false,
5243        })
5244    }
5245
5246    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5247        self.promote_work_items_matching(|item: &WorkItem| match item {
5248            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5249                *t == thread
5250            }
5251            _ => false,
5252        })
5253    }
5254
5255    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5256    where
5257        F: FnMut(&WorkItem) -> bool,
5258    {
5259        // If there's a high-priority work item to resume the current guest thread,
5260        // we don't need to promote anything, but we return true to indicate that
5261        // work is pending for the current instance.
5262        if self.high_priority.iter().any(&mut predicate) {
5263            true
5264        }
5265        // Otherwise, look for a low-priority work item that matches the current
5266        // instance and promote it to high-priority.
5267        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5268            let item = self.low_priority.remove(idx).unwrap();
5269            self.push_high_priority(item);
5270            true
5271        } else {
5272            false
5273        }
5274    }
5275
5276    /// Returns whether there's a pending cancellation on the current guest thread,
5277    /// consuming the event if so.
5278    fn take_pending_cancellation(&mut self) -> Result<bool> {
5279        let thread = self.current_guest_thread()?;
5280        if let Some(event) = self.get_mut(thread.task)?.event.take() {
5281            assert!(matches!(event, Event::Cancelled));
5282            Ok(true)
5283        } else {
5284            Ok(false)
5285        }
5286    }
5287
5288    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5289        if self.may_block(task)? {
5290            Ok(())
5291        } else {
5292            Err(Trap::CannotBlockSyncTask.into())
5293        }
5294    }
5295
5296    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5297        let task = self.get_mut(task)?;
5298        Ok(task.async_function || task.returned_or_cancelled())
5299    }
5300
5301    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5302    /// specified task.
5303    ///
5304    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5305    /// below.
5306    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5307        let (task, is_host) = (task >> 1, task & 1 == 1);
5308        if is_host {
5309            let task: TableId<HostTask> = TableId::new(task);
5310            Ok(&mut self.get_mut(task)?.call_context)
5311        } else {
5312            let task: TableId<GuestTask> = TableId::new(task);
5313            Ok(&mut self.get_mut(task)?.call_context)
5314        }
5315    }
5316
5317    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5318    /// in the future.
5319    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5320        let (bits, is_host) = match self.current_thread {
5321            CurrentThread::Guest(id) => (id.task.rep(), false),
5322            CurrentThread::Host(id) => (id.rep(), true),
5323            CurrentThread::None => bail_bug!("current thread is not set"),
5324        };
5325        assert_eq!((bits << 1) >> 1, bits);
5326        Ok((bits << 1) | u32::from(is_host))
5327    }
5328
5329    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5330        match self.current_thread.guest() {
5331            Some(id) => Ok(*id),
5332            None => bail_bug!("current thread is not a guest thread"),
5333        }
5334    }
5335
5336    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5337        match self.current_thread.host() {
5338            Some(id) => Ok(id),
5339            None => bail_bug!("current thread is not a host thread"),
5340        }
5341    }
5342
5343    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5344        match self.futures.get_mut().as_mut() {
5345            Some(f) => Ok(f),
5346            None => bail_bug!("futures field of concurrent state is currently taken"),
5347        }
5348    }
5349
5350    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5351        self.table.get_mut()
5352    }
5353}
5354
5355/// Provide a type hint to compiler about the shape of a parameter lower
5356/// closure.
5357fn for_any_lower<
5358    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5359>(
5360    fun: F,
5361) -> F {
5362    fun
5363}
5364
5365/// Provide a type hint to compiler about the shape of a result lift closure.
5366fn for_any_lift<
5367    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5368>(
5369    fun: F,
5370) -> F {
5371    fun
5372}
5373
5374/// Wrap the specified future in a `poll_fn` which asserts that the future is
5375/// only polled from the event loop of the specified `Store`.
5376///
5377/// See `StoreContextMut::run_concurrent` for details.
5378fn checked<F: Future + Send + 'static>(
5379    id: StoreId,
5380    fut: F,
5381) -> impl Future<Output = F::Output> + Send + 'static {
5382    async move {
5383        let mut fut = pin!(fut);
5384        future::poll_fn(move |cx| {
5385            let message = "\
5386                `Future`s which depend on asynchronous component tasks, streams, or \
5387                futures to complete may only be polled from the event loop of the \
5388                store to which they belong.  Please use \
5389                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5390            ";
5391            tls::try_get(|store| {
5392                let matched = match store {
5393                    tls::TryGet::Some(store) => store.id() == id,
5394                    tls::TryGet::Taken | tls::TryGet::None => false,
5395                };
5396
5397                if !matched {
5398                    panic!("{message}")
5399                }
5400            });
5401            fut.as_mut().poll(cx)
5402        })
5403        .await
5404    }
5405}
5406
5407/// Assert that `StoreContextMut::run_concurrent` has not been called from
5408/// within an store's event loop.
5409fn check_recursive_run() {
5410    tls::try_get(|store| {
5411        if !matches!(store, tls::TryGet::None) {
5412            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5413        }
5414    });
5415}
5416
5417fn unpack_callback_code(code: u32) -> (u32, u32) {
5418    (code & 0xF, code >> 4)
5419}
5420
5421/// Helper struct for packaging parameters to be passed to
5422/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5423/// `waitable-set.poll`.
5424struct WaitableCheckParams {
5425    set: TableId<WaitableSet>,
5426    options: OptionsIndex,
5427    payload: u32,
5428}
5429
5430/// Indicates whether `ComponentInstance::waitable_check` is being called for
5431/// `waitable-set.wait` or `waitable-set.poll`.
5432enum WaitableCheck {
5433    Wait,
5434    Poll,
5435}
5436
5437/// Represents a guest task called from the host, prepared using `prepare_call`.
5438pub(crate) struct PreparedCall<R> {
5439    /// The guest export to be called
5440    handle: Func,
5441    /// The guest thread created by `prepare_call`
5442    thread: QualifiedThreadId,
5443    /// The number of lowered core Wasm parameters to pass to the call.
5444    param_count: usize,
5445    /// The `oneshot::Receiver` to which the result of the call will be
5446    /// delivered when it is available.
5447    rx: oneshot::Receiver<LiftedResult>,
5448    /// The instance that this call is prepared for.
5449    runtime_instance: RuntimeInstance,
5450    _phantom: PhantomData<R>,
5451}
5452
5453impl<R> PreparedCall<R> {
5454    /// Get a copy of the `TaskId` for this `PreparedCall`.
5455    pub(crate) fn task_id(&self) -> TaskId {
5456        TaskId {
5457            task: self.thread.task,
5458            runtime_instance: self.runtime_instance,
5459        }
5460    }
5461}
5462
5463/// Represents a task created by `prepare_call`.
5464pub(crate) struct TaskId {
5465    task: TableId<GuestTask>,
5466    runtime_instance: RuntimeInstance,
5467}
5468
5469impl TaskId {
5470    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5471    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5472    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5473    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5474    /// and delete the task when all threads are done.
5475    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5476        let task = store.concurrent_state_mut().get_mut(self.task)?;
5477        let delete = if !task.already_lowered_parameters() {
5478            store.cancel_guest_subtask_without_lowered_parameters(
5479                self.runtime_instance,
5480                self.task,
5481            )?;
5482            true
5483        } else {
5484            task.host_future_state = HostFutureState::Dropped;
5485            task.ready_to_delete()
5486        };
5487        if delete {
5488            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut())?
5489        }
5490        Ok(())
5491    }
5492}
5493
5494/// Prepare a call to the specified exported Wasm function, providing functions
5495/// for lowering the parameters and lifting the result.
5496///
5497/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5498/// loop, use `queue_call`.
5499pub(crate) fn prepare_call<T, R>(
5500    mut store: StoreContextMut<T>,
5501    handle: Func,
5502    param_count: usize,
5503    host_future_present: bool,
5504    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5505    + Send
5506    + Sync
5507    + 'static,
5508    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5509    + Send
5510    + Sync
5511    + 'static,
5512) -> Result<PreparedCall<R>> {
5513    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5514
5515    let instance = handle.instance().id().get(store.0);
5516    let options = &instance.component().env_component().options[options];
5517    let ty = &instance.component().types()[ty];
5518    let async_function = ty.async_;
5519    let task_return_type = ty.results;
5520    let component_instance = raw_options.instance;
5521    let callback = options.callback.map(|i| instance.runtime_callback(i));
5522    let memory = options
5523        .memory()
5524        .map(|i| instance.runtime_memory(i))
5525        .map(SendSyncPtr::new);
5526    let string_encoding = options.string_encoding;
5527    let token = StoreToken::new(store.as_context_mut());
5528    let state = store.0.concurrent_state_mut();
5529
5530    let (tx, rx) = oneshot::channel();
5531
5532    let instance = handle.instance().runtime_instance(component_instance);
5533    let caller = state.current_thread;
5534    let thread = GuestTask::new(
5535        state,
5536        Box::new(for_any_lower(move |store, params| {
5537            lower_params(handle, token.as_context_mut(store), params)
5538        })),
5539        LiftResult {
5540            lift: Box::new(for_any_lift(move |store, result| {
5541                lift_result(handle, store, result)
5542            })),
5543            ty: task_return_type,
5544            memory,
5545            string_encoding,
5546        },
5547        Caller::Host {
5548            tx: Some(tx),
5549            host_future_present,
5550            caller,
5551        },
5552        callback.map(|callback| {
5553            let callback = SendSyncPtr::new(callback);
5554            let instance = handle.instance();
5555            Box::new(move |store: &mut dyn VMStore, event, handle| {
5556                let store = token.as_context_mut(store);
5557                // SAFETY: Per the contract of `prepare_call`, the callback
5558                // will remain valid at least as long is this task exists.
5559                unsafe { instance.call_callback(store, callback, event, handle) }
5560            }) as CallbackFn
5561        }),
5562        instance,
5563        async_function,
5564    )?;
5565
5566    if !store.0.may_enter(instance)? {
5567        bail!(Trap::CannotEnterComponent);
5568    }
5569
5570    Ok(PreparedCall {
5571        handle,
5572        thread,
5573        param_count,
5574        runtime_instance: instance,
5575        rx,
5576        _phantom: PhantomData,
5577    })
5578}
5579
5580/// Queue a call previously prepared using `prepare_call` to be run as part of
5581/// the associated `ComponentInstance`'s event loop.
5582///
5583/// The returned future will resolve to the result once it is available, but
5584/// must only be polled via the instance's event loop. See
5585/// `StoreContextMut::run_concurrent` for details.
5586pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5587    mut store: StoreContextMut<T>,
5588    prepared: PreparedCall<R>,
5589) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5590    let PreparedCall {
5591        handle,
5592        thread,
5593        param_count,
5594        rx,
5595        ..
5596    } = prepared;
5597
5598    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5599
5600    Ok(checked(
5601        store.0.id(),
5602        rx.map(move |result| match result {
5603            Ok(r) => match r.downcast() {
5604                Ok(r) => Ok(*r),
5605                Err(_) => bail_bug!("wrong type of value produced"),
5606            },
5607            Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5608        }),
5609    ))
5610}
5611
5612/// Queue a call previously prepared using `prepare_call` to be run as part of
5613/// the associated `ComponentInstance`'s event loop.
5614fn queue_call0<T: 'static>(
5615    store: StoreContextMut<T>,
5616    handle: Func,
5617    guest_thread: QualifiedThreadId,
5618    param_count: usize,
5619) -> Result<()> {
5620    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5621    let is_concurrent = raw_options.async_;
5622    let callback = raw_options.callback;
5623    let instance = handle.instance();
5624    let callee = handle.lifted_core_func(store.0);
5625    let post_return = handle.post_return_core_func(store.0);
5626    let callback = callback.map(|i| {
5627        let instance = instance.id().get(store.0);
5628        SendSyncPtr::new(instance.runtime_callback(i))
5629    });
5630
5631    log::trace!("queueing call {guest_thread:?}");
5632
5633    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5634    // (with signatures appropriate for this call) and will remain valid as
5635    // long as this instance is valid.
5636    unsafe {
5637        instance.queue_call(
5638            store,
5639            guest_thread,
5640            SendSyncPtr::new(callee),
5641            param_count,
5642            1,
5643            is_concurrent,
5644            callback,
5645            post_return.map(SendSyncPtr::new),
5646        )
5647    }
5648}