Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, call_post_return};
54use crate::component::{
55    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56};
57use crate::fiber::{self, StoreFiber, StoreFiberYield};
58use crate::prelude::*;
59use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62use crate::{
63    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64    bail, error::format_err,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::Trap;
87use wasmtime_environ::component::{
88    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93};
94use wasmtime_environ::packed_option::ReservedValue;
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112/// Constant defined in the Component Model spec to indicate that the async
113/// intrinsic (e.g. `future.write`) has not yet completed.
114const BLOCKED: u32 = 0xffff_ffff;
115
116/// Corresponds to `CallState` in the upstream spec.
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119    Starting = 0,
120    Started = 1,
121    Returned = 2,
122    StartCancelled = 3,
123    ReturnCancelled = 4,
124}
125
126impl Status {
127    /// Packs this status and the optional `waitable` provided into a 32-bit
128    /// result that the canonical ABI requires.
129    ///
130    /// The low 4 bits are reserved for the status while the upper 28 bits are
131    /// the waitable, if present.
132    pub fn pack(self, waitable: Option<u32>) -> u32 {
133        assert!(matches!(self, Status::Returned) == waitable.is_none());
134        let waitable = waitable.unwrap_or(0);
135        assert!(waitable < (1 << 28));
136        (waitable << 4) | (self as u32)
137    }
138}
139
140/// Corresponds to `EventCode` in the Component Model spec, plus related payload
141/// data.
142#[derive(Clone, Copy, Debug)]
143enum Event {
144    None,
145    Cancelled,
146    Subtask {
147        status: Status,
148    },
149    StreamRead {
150        code: ReturnCode,
151        pending: Option<(TypeStreamTableIndex, u32)>,
152    },
153    StreamWrite {
154        code: ReturnCode,
155        pending: Option<(TypeStreamTableIndex, u32)>,
156    },
157    FutureRead {
158        code: ReturnCode,
159        pending: Option<(TypeFutureTableIndex, u32)>,
160    },
161    FutureWrite {
162        code: ReturnCode,
163        pending: Option<(TypeFutureTableIndex, u32)>,
164    },
165}
166
167impl Event {
168    /// Lower this event to core Wasm integers for delivery to the guest.
169    ///
170    /// Note that the waitable handle, if any, is assumed to be lowered
171    /// separately.
172    fn parts(self) -> (u32, u32) {
173        const EVENT_NONE: u32 = 0;
174        const EVENT_SUBTASK: u32 = 1;
175        const EVENT_STREAM_READ: u32 = 2;
176        const EVENT_STREAM_WRITE: u32 = 3;
177        const EVENT_FUTURE_READ: u32 = 4;
178        const EVENT_FUTURE_WRITE: u32 = 5;
179        const EVENT_CANCELLED: u32 = 6;
180        match self {
181            Event::None => (EVENT_NONE, 0),
182            Event::Cancelled => (EVENT_CANCELLED, 0),
183            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188        }
189    }
190}
191
192/// Corresponds to `CallbackCode` in the spec.
193mod callback_code {
194    pub const EXIT: u32 = 0;
195    pub const YIELD: u32 = 1;
196    pub const WAIT: u32 = 2;
197}
198
199/// A flag indicating that the callee is an async-lowered export.
200///
201/// This may be passed to the `async-start` intrinsic from a fused adapter.
202const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204/// Provides access to either store data (via the `get` method) or the store
205/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206/// instance to which the current host task belongs.
207///
208/// See [`Accessor::with`] for details.
209pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210    store: StoreContextMut<'a, T>,
211    get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216    D: HasData + ?Sized,
217    T: 'static,
218{
219    /// Creates a new [`Access`] from its component parts.
220    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221        Self { store, get_data }
222    }
223
224    /// Get mutable access to the store data.
225    pub fn data_mut(&mut self) -> &mut T {
226        self.store.data_mut()
227    }
228
229    /// Get mutable access to the store data.
230    pub fn get(&mut self) -> D::Data<'_> {
231        (self.get_data)(self.data_mut())
232    }
233
234    /// Spawn a background task.
235    ///
236    /// See [`Accessor::spawn`] for details.
237    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238    where
239        T: 'static,
240    {
241        let accessor = Accessor {
242            get_data: self.get_data,
243            token: StoreToken::new(self.store.as_context_mut()),
244        };
245        self.store
246            .as_context_mut()
247            .spawn_with_accessor(accessor, task)
248    }
249
250    /// Returns the getter this accessor is using to project from `T` into
251    /// `D::Data`.
252    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253        self.get_data
254    }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259    D: HasData + ?Sized,
260    T: 'static,
261{
262    type Data = T;
263
264    fn as_context(&self) -> StoreContext<'_, T> {
265        self.store.as_context()
266    }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271    D: HasData + ?Sized,
272    T: 'static,
273{
274    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275        self.store.as_context_mut()
276    }
277}
278
279/// Provides scoped mutable access to store data in the context of a concurrent
280/// host task future.
281///
282/// This allows multiple host task futures to execute concurrently and access
283/// the store between (but not across) `await` points.
284///
285/// # Rationale
286///
287/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288/// `D::Data<'_>`. The problem this is solving, however, is that it does not
289/// literally store these values. The basic problem is that when a concurrent
290/// host future is being polled it has access to `&mut T` (and the whole
291/// `Store`) but when it's not being polled it does not have access to these
292/// values. This reflects how the store is only ever polling one future at a
293/// time so the store is effectively being passed between futures.
294///
295/// Rust's `Future` trait, however, has no means of passing a `Store`
296/// temporarily between futures. The [`Context`](std::task::Context) type does
297/// not have the ability to attach arbitrary information to it at this time.
298/// This type, [`Accessor`], is used to bridge this expressivity gap.
299///
300/// The [`Accessor`] type here represents the ability to acquire, temporarily in
301/// a synchronous manner, the current store. The [`Accessor::with`] function
302/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304/// not take an `async` closure as its argument, instead it's a synchronous
305/// closure which must complete during on run of `Future::poll`. This reflects
306/// how the store is temporarily made available while a host future is being
307/// polled.
308///
309/// # Implementation
310///
311/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312/// this type additionally doesn't even have a lifetime parameter. This is
313/// instead a representation of proof of the ability to acquire these while a
314/// future is being polled. Wasmtime will, when it polls a host future,
315/// configure ambient state such that the `Accessor` that a future closes over
316/// will work and be able to access the store.
317///
318/// This has a number of implications for users such as:
319///
320/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321///   the lifetime of a single future.
322/// * A future is expected to, however, close over an `Accessor` and keep it
323///   alive probably for the duration of the entire future.
324/// * Different host futures will be given different `Accessor`s, and that's
325///   intentional.
326/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327///   alleviates some otherwise required bounds to be written down.
328///
329/// # Using `Accessor` in `Drop`
330///
331/// The methods on `Accessor` are only expected to work in the context of
332/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333/// host future can be dropped at any time throughout the system and Wasmtime
334/// store context is not necessarily available at that time. It's recommended to
335/// not use `Accessor` methods in anything connected to a `Drop` implementation
336/// as they will panic and have unintended results. If you run into this though
337/// feel free to file an issue on the Wasmtime repository.
338pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340    D: HasData + ?Sized,
341{
342    token: StoreToken<T>,
343    get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346/// A helper trait to take any type of accessor-with-data in functions.
347///
348/// This trait is similar to [`AsContextMut`] except that it's used when
349/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350/// [`Accessor`] is the main type used in concurrent settings and is passed to
351/// functions such as [`Func::call_concurrent`].
352///
353/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354/// this trait. This effectively means that regardless of the `D` in
355/// `Accessor<T, D>` it can still be passed to a function which just needs a
356/// store accessor.
357///
358/// Acquiring an [`Accessor`] can be done through
359/// [`StoreContextMut::run_concurrent`] for example or in a host function
360/// through
361/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362pub trait AsAccessor {
363    /// The `T` in `Store<T>` that this accessor refers to.
364    type Data: 'static;
365
366    /// The `D` in `Accessor<T, D>`, or the projection out of
367    /// `Self::Data`.
368    type AccessorData: HasData + ?Sized;
369
370    /// Returns the accessor that this is referring to.
371    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375    type Data = T::Data;
376    type AccessorData = T::AccessorData;
377
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379        T::as_accessor(self)
380    }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384    type Data = T;
385    type AccessorData = D;
386
387    fn as_accessor(&self) -> &Accessor<T, D> {
388        self
389    }
390}
391
392// Note that it is intentional at this time that `Accessor` does not actually
393// store `&mut T` or anything similar. This distinctly enables the `Accessor`
394// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395// that matter). This is used to ergonomically simplify bindings where the
396// majority of the time `Accessor` is closed over in a future which then needs
397// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398// you already have to write `T: 'static`...) it helps to avoid this.
399//
400// Note as well that `Accessor` doesn't actually store its data at all. Instead
401// it's more of a "proof" of what can be accessed from TLS. API design around
402// `Accessor` and functions like `Linker::func_wrap_concurrent` are
403// intentionally made to ensure that `Accessor` is ideally only used in the
404// context that TLS variables are actually set. For example host functions are
405// given `&Accessor`, not `Accessor`, and this prevents them from persisting
406// the value outside of a future. Within the future the TLS variables are all
407// guaranteed to be set while the future is being polled.
408//
409// Finally though this is not an ironclad guarantee, but nor does it need to be.
410// The TLS APIs are designed to panic or otherwise model usage where they're
411// called recursively or similar. It's hoped that code cannot be constructed to
412// actually hit this at runtime but this is not a safety requirement at this
413// time.
414const _: () = {
415    const fn assert<T: Send + Sync>() {}
416    assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420    /// Creates a new `Accessor` backed by the specified functions.
421    ///
422    /// - `get`: used to retrieve the store
423    ///
424    /// - `get_data`: used to "project" from the store's associated data to
425    /// another type (e.g. a field of that data or a wrapper around it).
426    ///
427    /// - `spawn`: used to queue spawned background tasks to be run later
428    pub(crate) fn new(token: StoreToken<T>) -> Self {
429        Self {
430            token,
431            get_data: |x| x,
432        }
433    }
434}
435
436impl<T, D> Accessor<T, D>
437where
438    D: HasData + ?Sized,
439{
440    /// Run the specified closure, passing it mutable access to the store.
441    ///
442    /// This function is one of the main building blocks of the [`Accessor`]
443    /// type. This yields synchronous, blocking, access to the store via an
444    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445    /// providing the ability to access `D` via [`Access::get`]. Note that the
446    /// `fun` here is given only temporary access to the store and `T`/`D`
447    /// meaning that the return value `R` here is not allowed to capture borrows
448    /// into the two. If access is needed to data within `T` or `D` outside of
449    /// this closure then it must be `clone`d out, for example.
450    ///
451    /// # Panics
452    ///
453    /// This function will panic if it is call recursively with any other
454    /// accessor already in scope. For example if `with` is called within `fun`,
455    /// then this function will panic. It is up to the embedder to ensure that
456    /// this does not happen.
457    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458        tls::get(|vmstore| {
459            fun(Access {
460                store: self.token.as_context_mut(vmstore),
461                get_data: self.get_data,
462            })
463        })
464    }
465
466    /// Returns the getter this accessor is using to project from `T` into
467    /// `D::Data`.
468    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469        self.get_data
470    }
471
472    /// Changes this accessor to access `D2` instead of the current type
473    /// parameter `D`.
474    ///
475    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476    ///
477    /// # Panics
478    ///
479    /// When using this API the returned value is disconnected from `&self` and
480    /// the lifetime binding the `self` argument. An `Accessor` only works
481    /// within the context of the closure or async closure that it was
482    /// originally given to, however. This means that due to the fact that the
483    /// returned value has no lifetime connection it's possible to use the
484    /// accessor outside of `&self`, the original accessor, and panic.
485    ///
486    /// The returned value should only be used within the scope of the original
487    /// `Accessor` that `self` refers to.
488    pub fn with_getter<D2: HasData>(
489        &self,
490        get_data: fn(&mut T) -> D2::Data<'_>,
491    ) -> Accessor<T, D2> {
492        Accessor {
493            token: self.token,
494            get_data,
495        }
496    }
497
498    /// Spawn a background task which will receive an `&Accessor<T, D>` and
499    /// run concurrently with any other tasks in progress for the current
500    /// store.
501    ///
502    /// This is particularly useful for host functions which return a `stream`
503    /// or `future` such that the code to write to the write end of that
504    /// `stream` or `future` must run after the function returns.
505    ///
506    /// The returned [`JoinHandle`] may be used to cancel the task.
507    ///
508    /// # Panics
509    ///
510    /// Panics if called within a closure provided to the [`Accessor::with`]
511    /// function. This can only be called outside an active invocation of
512    /// [`Accessor::with`].
513    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514    where
515        T: 'static,
516    {
517        let accessor = self.clone_for_spawn();
518        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519    }
520
521    fn clone_for_spawn(&self) -> Self {
522        Self {
523            token: self.token,
524            get_data: self.get_data,
525        }
526    }
527}
528
529/// Represents a task which may be provided to `Accessor::spawn`,
530/// `Accessor::forward`, or `StorecContextMut::spawn`.
531// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
532// option.
533//
534// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
535// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
536// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
537// Send + Sync + 'static` fails with a type mismatch error when we try to pass
538// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
539// best we can do for the time being.
540pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542    D: HasData + ?Sized,
543{
544    /// Run the task.
545    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548/// Represents parameter and result metadata for the caller side of a
549/// guest->guest call orchestrated by a fused adapter.
550enum CallerInfo {
551    /// Metadata for a call to an async-lowered import
552    Async {
553        params: Vec<ValRaw>,
554        has_result: bool,
555    },
556    /// Metadata for a call to an sync-lowered import
557    Sync {
558        params: Vec<ValRaw>,
559        result_count: u32,
560    },
561}
562
563/// Indicates how a guest task is waiting on a waitable set.
564enum WaitMode {
565    /// The guest task is waiting using `task.wait`
566    Fiber(StoreFiber<'static>),
567    /// The guest task is waiting via a callback declared as part of an
568    /// async-lifted export.
569    Callback(Instance),
570}
571
572/// Represents the reason a fiber is suspending itself.
573#[derive(Debug)]
574enum SuspendReason {
575    /// The fiber is waiting for an event to be delivered to the specified
576    /// waitable set or task.
577    Waiting {
578        set: TableId<WaitableSet>,
579        thread: QualifiedThreadId,
580        skip_may_block_check: bool,
581    },
582    /// The fiber has finished handling its most recent work item and is waiting
583    /// for another (or to be dropped if it is no longer needed).
584    NeedWork,
585    /// The fiber is yielding and should be resumed once other tasks have had a
586    /// chance to run.
587    Yielding {
588        thread: QualifiedThreadId,
589        skip_may_block_check: bool,
590    },
591    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
592    ExplicitlySuspending {
593        thread: QualifiedThreadId,
594        skip_may_block_check: bool,
595    },
596}
597
598/// Represents a pending call into guest code for a given guest task.
599enum GuestCallKind {
600    /// Indicates there's an event to deliver to the task, possibly related to a
601    /// waitable set the task has been waiting on or polling.
602    DeliverEvent {
603        /// The instance to which the task belongs.
604        instance: Instance,
605        /// The waitable set the event belongs to, if any.
606        ///
607        /// If this is `None` the event will be waiting in the
608        /// `GuestTask::event` field for the task.
609        set: Option<TableId<WaitableSet>>,
610    },
611    /// Indicates that a new guest task call is pending and may be executed
612    /// using the specified closure.
613    ///
614    /// If the closure returns `Ok(Some(call))`, the `call` should be run
615    /// immediately using `handle_guest_call`.
616    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622        match self {
623            Self::DeliverEvent { instance, set } => f
624                .debug_struct("DeliverEvent")
625                .field("instance", instance)
626                .field("set", set)
627                .finish(),
628            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630        }
631    }
632}
633
634/// The target of a suspension intrinsic.
635#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637    SomeSuspended(u32),
638    Some(u32),
639    None,
640}
641
642impl SuspensionTarget {
643    fn is_none(&self) -> bool {
644        matches!(self, SuspensionTarget::None)
645    }
646    fn is_some(&self) -> bool {
647        !self.is_none()
648    }
649}
650
651/// Represents a pending call into guest code for a given guest thread.
652#[derive(Debug)]
653struct GuestCall {
654    thread: QualifiedThreadId,
655    kind: GuestCallKind,
656}
657
658impl GuestCall {
659    /// Returns whether or not the call is ready to run.
660    ///
661    /// A call will not be ready to run if either:
662    ///
663    /// - the (sub-)component instance to be called has already been entered and
664    /// cannot be reentered until an in-progress call completes
665    ///
666    /// - the call is for a not-yet started task and the (sub-)component
667    /// instance to be called has backpressure enabled
668    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669        let instance = store
670            .concurrent_state_mut()
671            .get_mut(self.thread.task)?
672            .instance;
673        let state = store.instance_state(instance).concurrent_state();
674
675        let ready = match &self.kind {
676            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678            GuestCallKind::StartExplicit(_) => true,
679        };
680        log::trace!(
681            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682            state.do_not_enter,
683            state.backpressure
684        );
685        Ok(ready)
686    }
687}
688
689/// Job to be run on a worker fiber.
690enum WorkerItem {
691    GuestCall(GuestCall),
692    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695/// Represents a pending work item to be handled by the event loop for a given
696/// component instance.
697enum WorkItem {
698    /// A host task to be pushed to `ConcurrentState::futures`.
699    PushFuture(AlwaysMut<HostTaskFuture>),
700    /// A fiber to resume.
701    ResumeFiber(StoreFiber<'static>),
702    /// A thread to resume.
703    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704    /// A pending call into guest code for a given guest task.
705    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706    /// A job to run on a worker fiber.
707    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712        match self {
713            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715            Self::ResumeThread(instance, thread) => f
716                .debug_tuple("ResumeThread")
717                .field(instance)
718                .field(thread)
719                .finish(),
720            Self::GuestCall(instance, call) => f
721                .debug_tuple("GuestCall")
722                .field(instance)
723                .field(call)
724                .finish(),
725            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726        }
727    }
728}
729
730/// Whether a suspension intrinsic was cancelled or completed
731#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733    Cancelled,
734    Completed,
735}
736
737/// Poll the specified future until it completes on behalf of a guest->host call
738/// using a sync-lowered import.
739///
740/// This is similar to `Instance::first_poll` except it's for sync-lowered
741/// imports, meaning we don't need to handle cancellation and we can block the
742/// caller until the task completes, at which point the caller can handle
743/// lowering the result to the guest's stack and linear memory.
744pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745    store: &mut dyn VMStore,
746    future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748    let state = store.concurrent_state_mut();
749    let task = state.unwrap_current_host_thread();
750
751    // Wrap the future in a closure which will take care of stashing the result
752    // in `GuestTask::result` and resuming this fiber when the host task
753    // completes.
754    let mut future = Box::pin(async move {
755        let result = future.await?;
756        tls::get(move |store| {
757            let state = store.concurrent_state_mut();
758            let host_state = &mut state.get_mut(task)?.state;
759            assert!(matches!(host_state, HostTaskState::CalleeStarted));
760            *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762            Waitable::Host(task).set_event(
763                state,
764                Some(Event::Subtask {
765                    status: Status::Returned,
766                }),
767            )?;
768
769            Ok(())
770        })
771    }) as HostTaskFuture;
772
773    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
774    // add the future to `ConcurrentState::futures` and poll it automatically
775    // from the event loop if it doesn't complete immediately here.
776    let poll = tls::set(store, || {
777        future
778            .as_mut()
779            .poll(&mut Context::from_waker(&Waker::noop()))
780    });
781
782    match poll {
783        // It completed immediately; check the result and delete the task.
784        Poll::Ready(result) => result?,
785
786        // It did not complete immediately; add it to
787        // `ConcurrentState::futures` so it will be polled via the event loop;
788        // then use `GuestTask::sync_call_set` to wait for the task to
789        // complete, suspending the current fiber until it does so.
790        Poll::Pending => {
791            let state = store.concurrent_state_mut();
792            state.push_future(future);
793
794            let caller = state.get_mut(task)?.caller;
795            let set = state.get_mut(caller.task)?.sync_call_set;
796            Waitable::Host(task).join(state, Some(set))?;
797
798            store.suspend(SuspendReason::Waiting {
799                set,
800                thread: caller,
801                skip_may_block_check: false,
802            })?;
803
804            // Remove the `task` from the `sync_call_set` to ensure that when
805            // this function returns and the task is deleted that there are no
806            // more lingering references to this host task.
807            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808        }
809    }
810
811    // Retrieve and return the result.
812    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813    match mem::replace(host_state, HostTaskState::CalleeDone) {
814        HostTaskState::CalleeFinished(result) => Ok(*result.downcast().unwrap()),
815        _ => panic!("unexpected host task state after completion"),
816    }
817}
818
819/// Execute the specified guest call.
820fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821    let mut next = Some(call);
822    while let Some(call) = next.take() {
823        match call.kind {
824            GuestCallKind::DeliverEvent { instance, set } => {
825                let (event, waitable) = instance
826                    .get_event(store, call.thread.task, set, true)?
827                    .unwrap();
828                let state = store.concurrent_state_mut();
829                let task = state.get_mut(call.thread.task)?;
830                let runtime_instance = task.instance;
831                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832
833                log::trace!(
834                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835                    call.thread,
836                );
837
838                let old_thread = store.set_thread(call.thread);
839                log::trace!(
840                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841                    call.thread
842                );
843
844                store.enter_instance(runtime_instance);
845
846                let callback = store
847                    .concurrent_state_mut()
848                    .get_mut(call.thread.task)?
849                    .callback
850                    .take()
851                    .unwrap();
852
853                let code = callback(store, event, handle)?;
854
855                store
856                    .concurrent_state_mut()
857                    .get_mut(call.thread.task)?
858                    .callback = Some(callback);
859
860                store.exit_instance(runtime_instance)?;
861
862                store.set_thread(old_thread);
863
864                next = instance.handle_callback_code(
865                    store,
866                    call.thread,
867                    runtime_instance.index,
868                    code,
869                )?;
870
871                log::trace!(
872                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
873                );
874            }
875            GuestCallKind::StartImplicit(fun) => {
876                next = fun(store)?;
877            }
878            GuestCallKind::StartExplicit(fun) => {
879                fun(store)?;
880            }
881        }
882    }
883
884    Ok(())
885}
886
887impl<T> Store<T> {
888    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
889    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
890    where
891        T: Send + 'static,
892    {
893        ensure!(
894            self.as_context().0.concurrency_support(),
895            "cannot use `run_concurrent` when Config::concurrency_support disabled",
896        );
897        self.as_context_mut().run_concurrent(fun).await
898    }
899
900    #[doc(hidden)]
901    pub fn assert_concurrent_state_empty(&mut self) {
902        self.as_context_mut().assert_concurrent_state_empty();
903    }
904
905    #[doc(hidden)]
906    pub fn concurrent_state_table_size(&mut self) -> usize {
907        self.as_context_mut().concurrent_state_table_size()
908    }
909
910    /// Convenience wrapper for [`StoreContextMut::spawn`].
911    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
912    where
913        T: 'static,
914    {
915        self.as_context_mut().spawn(task)
916    }
917}
918
919impl<T> StoreContextMut<'_, T> {
920    /// Assert that all the relevant tables and queues in the concurrent state
921    /// for this store are empty.
922    ///
923    /// This is for sanity checking in integration tests
924    /// (e.g. `component-async-tests`) that the relevant state has been cleared
925    /// after each test concludes.  This should help us catch leaks, e.g. guest
926    /// tasks which haven't been deleted despite having completed and having
927    /// been dropped by their supertasks.
928    ///
929    /// Only intended for use in Wasmtime's own testing.
930    #[doc(hidden)]
931    pub fn assert_concurrent_state_empty(self) {
932        let store = self.0;
933        store
934            .store_data_mut()
935            .components
936            .assert_instance_states_empty();
937        let state = store.concurrent_state_mut();
938        assert!(
939            state.table.get_mut().is_empty(),
940            "non-empty table: {:?}",
941            state.table.get_mut()
942        );
943        assert!(state.high_priority.is_empty());
944        assert!(state.low_priority.is_empty());
945        assert!(state.current_thread.is_none());
946        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
947        assert!(state.global_error_context_ref_counts.is_empty());
948    }
949
950    /// Helper function to perform tests over the size of the concurrent state
951    /// table which can be useful for detecting leaks.
952    ///
953    /// Only intended for use in Wasmtime's own testing.
954    #[doc(hidden)]
955    pub fn concurrent_state_table_size(&mut self) -> usize {
956        self.0
957            .concurrent_state_mut()
958            .table
959            .get_mut()
960            .iter_mut()
961            .count()
962    }
963
964    /// Spawn a background task to run as part of this instance's event loop.
965    ///
966    /// The task will receive an `&Accessor<U>` and run concurrently with
967    /// any other tasks in progress for the instance.
968    ///
969    /// Note that the task will only make progress if and when the event loop
970    /// for this instance is run.
971    ///
972    /// The returned [`JoinHandle`] may be used to cancel the task.
973    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
974    where
975        T: 'static,
976    {
977        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
978        self.spawn_with_accessor(accessor, task)
979    }
980
981    /// Internal implementation of `spawn` functions where a `store` is
982    /// available along with an `Accessor`.
983    fn spawn_with_accessor<D>(
984        self,
985        accessor: Accessor<T, D>,
986        task: impl AccessorTask<T, D>,
987    ) -> JoinHandle
988    where
989        T: 'static,
990        D: HasData + ?Sized,
991    {
992        // Create an "abortable future" here where internally the future will
993        // hook calls to poll and possibly spawn more background tasks on each
994        // iteration.
995        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
996        self.0
997            .concurrent_state_mut()
998            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
999        handle
1000    }
1001
1002    /// Run the specified closure `fun` to completion as part of this store's
1003    /// event loop.
1004    ///
1005    /// This will run `fun` as part of this store's event loop until it
1006    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1007    /// controlled access to the store and its data.
1008    ///
1009    /// This function can be used to invoke [`Func::call_concurrent`] for
1010    /// example within the async closure provided here.
1011    ///
1012    /// This function will unconditionally return an error if
1013    /// [`Config::concurrency_support`] is disabled.
1014    ///
1015    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1016    ///
1017    /// # Store-blocking behavior
1018    ///
1019    /// At this time there are certain situations in which the `Future` returned
1020    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1021    /// extended period of time, despite one or more `Waker::wake` events having
1022    /// occurred for the task to which it belongs.  This can manifest as the
1023    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1024    /// the `Store` being held by e.g. a blocking host function, preventing the
1025    /// `Future` from being polled. A canonical example of this is when the
1026    /// `fun` provided to this function attempts to set a timeout for an
1027    /// invocation of a wasm function. In this situation the async closure is
1028    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1029    /// to elapse. At this time this setup will not always work and the timeout
1030    /// may not reliably fire.
1031    ///
1032    /// This function will not block the current thread and as such is always
1033    /// suitable to run in an `async` context, but the current implementation of
1034    /// Wasmtime can lead to situations where a certain wasm computation is
1035    /// required to make progress the closure to make progress. This is an
1036    /// artifact of Wasmtime's historical implementation of `async` functions
1037    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1038    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1039    /// progress for a readiness notification of (b) to get delivered.
1040    ///
1041    /// This effectively means that it's not possible to reliably perform a
1042    /// "select" operation within the `fun` closure, which timeouts for example
1043    /// are based on. Fixing this requires some relatively major refactoring
1044    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1045    /// that is intended to be fixed one day. In the meantime it's recommended
1046    /// to apply timeouts or such to the entire `run_concurrent` call itself
1047    /// rather than internally.
1048    ///
1049    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1050    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1051    ///
1052    /// # Example
1053    ///
1054    /// ```
1055    /// # use {
1056    /// #   wasmtime::{
1057    /// #     error::{Result},
1058    /// #     component::{ Component, Linker, Resource, ResourceTable},
1059    /// #     Config, Engine, Store
1060    /// #   },
1061    /// # };
1062    /// #
1063    /// # struct MyResource(u32);
1064    /// # struct Ctx { table: ResourceTable }
1065    /// #
1066    /// # async fn foo() -> Result<()> {
1067    /// # let mut config = Config::new();
1068    /// # let engine = Engine::new(&config)?;
1069    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1070    /// # let mut linker = Linker::new(&engine);
1071    /// # let component = Component::new(&engine, "")?;
1072    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1073    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1074    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1075    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1076    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1077    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1078    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1079    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1080    ///    Ok(())
1081    /// }).await??;
1082    /// # Ok(())
1083    /// # }
1084    /// ```
1085    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1086    where
1087        T: Send + 'static,
1088    {
1089        ensure!(
1090            self.0.concurrency_support(),
1091            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1092        );
1093        self.do_run_concurrent(fun, false).await
1094    }
1095
1096    pub(super) async fn run_concurrent_trap_on_idle<R>(
1097        self,
1098        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1099    ) -> Result<R>
1100    where
1101        T: Send + 'static,
1102    {
1103        self.do_run_concurrent(fun, true).await
1104    }
1105
1106    async fn do_run_concurrent<R>(
1107        mut self,
1108        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1109        trap_on_idle: bool,
1110    ) -> Result<R>
1111    where
1112        T: Send + 'static,
1113    {
1114        debug_assert!(self.0.concurrency_support());
1115        check_recursive_run();
1116        let token = StoreToken::new(self.as_context_mut());
1117
1118        struct Dropper<'a, T: 'static, V> {
1119            store: StoreContextMut<'a, T>,
1120            value: ManuallyDrop<V>,
1121        }
1122
1123        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1124            fn drop(&mut self) {
1125                tls::set(self.store.0, || {
1126                    // SAFETY: Here we drop the value without moving it for the
1127                    // first and only time -- per the contract for `Drop::drop`,
1128                    // this code won't run again, and the `value` field will no
1129                    // longer be accessible.
1130                    unsafe { ManuallyDrop::drop(&mut self.value) }
1131                });
1132            }
1133        }
1134
1135        let accessor = &Accessor::new(token);
1136        let dropper = &mut Dropper {
1137            store: self,
1138            value: ManuallyDrop::new(fun(accessor)),
1139        };
1140        // SAFETY: We never move `dropper` nor its `value` field.
1141        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1142
1143        dropper
1144            .store
1145            .as_context_mut()
1146            .poll_until(future, trap_on_idle)
1147            .await
1148    }
1149
1150    /// Run this store's event loop.
1151    ///
1152    /// The returned future will resolve when the specified future completes or,
1153    /// if `trap_on_idle` is true, when the event loop can't make further
1154    /// progress.
1155    async fn poll_until<R>(
1156        mut self,
1157        mut future: Pin<&mut impl Future<Output = R>>,
1158        trap_on_idle: bool,
1159    ) -> Result<R>
1160    where
1161        T: Send + 'static,
1162    {
1163        struct Reset<'a, T: 'static> {
1164            store: StoreContextMut<'a, T>,
1165            futures: Option<FuturesUnordered<HostTaskFuture>>,
1166        }
1167
1168        impl<'a, T> Drop for Reset<'a, T> {
1169            fn drop(&mut self) {
1170                if let Some(futures) = self.futures.take() {
1171                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1172                }
1173            }
1174        }
1175
1176        loop {
1177            // Take `ConcurrentState::futures` out of the store so we can poll
1178            // it while also safely giving any of the futures inside access to
1179            // `self`.
1180            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1181            let mut reset = Reset {
1182                store: self.as_context_mut(),
1183                futures,
1184            };
1185            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1186
1187            enum PollResult<R> {
1188                Complete(R),
1189                ProcessWork(Vec<WorkItem>),
1190            }
1191            let result = future::poll_fn(|cx| {
1192                // First, poll the future we were passed as an argument and
1193                // return immediately if it's ready.
1194                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1195                    return Poll::Ready(Ok(PollResult::Complete(value)));
1196                }
1197
1198                // Next, poll `ConcurrentState::futures` (which includes any
1199                // pending host tasks and/or background tasks), returning
1200                // immediately if one of them fails.
1201                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1202                    Poll::Ready(Some(output)) => {
1203                        match output {
1204                            Err(e) => return Poll::Ready(Err(e)),
1205                            Ok(()) => {}
1206                        }
1207                        Poll::Ready(true)
1208                    }
1209                    Poll::Ready(None) => Poll::Ready(false),
1210                    Poll::Pending => Poll::Pending,
1211                };
1212
1213                // Next, collect the next batch of work items to process, if any.
1214                // This will be either all of the high-priority work items, or if
1215                // there are none, a single low-priority work item.
1216                let state = reset.store.0.concurrent_state_mut();
1217                let ready = state.collect_work_items_to_run();
1218                if !ready.is_empty() {
1219                    return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1220                }
1221
1222                // Finally, if we have nothing else to do right now, determine what to do
1223                // based on whether there are any pending futures in
1224                // `ConcurrentState::futures`.
1225                return match next {
1226                    Poll::Ready(true) => {
1227                        // In this case, one of the futures in
1228                        // `ConcurrentState::futures` completed
1229                        // successfully, so we return now and continue
1230                        // the outer loop in case there is another one
1231                        // ready to complete.
1232                        Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1233                    }
1234                    Poll::Ready(false) => {
1235                        // Poll the future we were passed one last time
1236                        // in case one of `ConcurrentState::futures` had
1237                        // the side effect of unblocking it.
1238                        if let Poll::Ready(value) =
1239                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1240                        {
1241                            Poll::Ready(Ok(PollResult::Complete(value)))
1242                        } else {
1243                            // In this case, there are no more pending
1244                            // futures in `ConcurrentState::futures`,
1245                            // there are no remaining work items, _and_
1246                            // the future we were passed as an argument
1247                            // still hasn't completed.
1248                            if trap_on_idle {
1249                                // `trap_on_idle` is true, so we exit
1250                                // immediately.
1251                                Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1252                            } else {
1253                                // `trap_on_idle` is false, so we assume
1254                                // that future will wake up and give us
1255                                // more work to do when it's ready to.
1256                                Poll::Pending
1257                            }
1258                        }
1259                    }
1260                    // There is at least one pending future in
1261                    // `ConcurrentState::futures` and we have nothing
1262                    // else to do but wait for now, so we return
1263                    // `Pending`.
1264                    Poll::Pending => Poll::Pending,
1265                };
1266            })
1267            .await;
1268
1269            // Put the `ConcurrentState::futures` back into the store before we
1270            // return or handle any work items since one or more of those items
1271            // might append more futures.
1272            drop(reset);
1273
1274            match result? {
1275                // The future we were passed as an argument completed, so we
1276                // return the result.
1277                PollResult::Complete(value) => break Ok(value),
1278                // The future we were passed has not yet completed, so handle
1279                // any work items and then loop again.
1280                PollResult::ProcessWork(ready) => {
1281                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1282                        store: StoreContextMut<'a, T>,
1283                        ready: I,
1284                    }
1285
1286                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1287                        fn drop(&mut self) {
1288                            while let Some(item) = self.ready.next() {
1289                                match item {
1290                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1291                                    WorkItem::PushFuture(future) => {
1292                                        tls::set(self.store.0, move || drop(future))
1293                                    }
1294                                    _ => {}
1295                                }
1296                            }
1297                        }
1298                    }
1299
1300                    let mut dispose = Dispose {
1301                        store: self.as_context_mut(),
1302                        ready: ready.into_iter(),
1303                    };
1304
1305                    while let Some(item) = dispose.ready.next() {
1306                        dispose
1307                            .store
1308                            .as_context_mut()
1309                            .handle_work_item(item)
1310                            .await?;
1311                    }
1312                }
1313            }
1314        }
1315    }
1316
1317    /// Handle the specified work item, possibly resuming a fiber if applicable.
1318    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1319    where
1320        T: Send,
1321    {
1322        log::trace!("handle work item {item:?}");
1323        match item {
1324            WorkItem::PushFuture(future) => {
1325                self.0
1326                    .concurrent_state_mut()
1327                    .futures
1328                    .get_mut()
1329                    .as_mut()
1330                    .unwrap()
1331                    .push(future.into_inner());
1332            }
1333            WorkItem::ResumeFiber(fiber) => {
1334                self.0.resume_fiber(fiber).await?;
1335            }
1336            WorkItem::ResumeThread(_, thread) => {
1337                if let GuestThreadState::Ready(fiber) = mem::replace(
1338                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1339                    GuestThreadState::Running,
1340                ) {
1341                    self.0.resume_fiber(fiber).await?;
1342                } else {
1343                    bail!("cannot resume non-pending thread {thread:?}");
1344                }
1345            }
1346            WorkItem::GuestCall(_, call) => {
1347                if call.is_ready(self.0)? {
1348                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1349                } else {
1350                    let state = self.0.concurrent_state_mut();
1351                    let task = state.get_mut(call.thread.task)?;
1352                    if !task.starting_sent {
1353                        task.starting_sent = true;
1354                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1355                            Waitable::Guest(call.thread.task).set_event(
1356                                state,
1357                                Some(Event::Subtask {
1358                                    status: Status::Starting,
1359                                }),
1360                            )?;
1361                        }
1362                    }
1363
1364                    let instance = state.get_mut(call.thread.task)?.instance;
1365                    self.0
1366                        .instance_state(instance)
1367                        .concurrent_state()
1368                        .pending
1369                        .insert(call.thread, call.kind);
1370                }
1371            }
1372            WorkItem::WorkerFunction(fun) => {
1373                self.run_on_worker(WorkerItem::Function(fun)).await?;
1374            }
1375        }
1376
1377        Ok(())
1378    }
1379
1380    /// Execute the specified guest call on a worker fiber.
1381    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1382    where
1383        T: Send,
1384    {
1385        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1386            fiber
1387        } else {
1388            fiber::make_fiber(self.0, move |store| {
1389                loop {
1390                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1391                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1392                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1393                    }
1394
1395                    store.suspend(SuspendReason::NeedWork)?;
1396                }
1397            })?
1398        };
1399
1400        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1401        assert!(worker_item.is_none());
1402        *worker_item = Some(item);
1403
1404        self.0.resume_fiber(worker).await
1405    }
1406
1407    /// Wrap the specified host function in a future which will call it, passing
1408    /// it an `&Accessor<T>`.
1409    ///
1410    /// See the `Accessor` documentation for details.
1411    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1412    where
1413        T: 'static,
1414        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1415            + Send
1416            + Sync
1417            + 'static,
1418        R: Send + Sync + 'static,
1419    {
1420        let token = StoreToken::new(self);
1421        async move {
1422            let mut accessor = Accessor::new(token);
1423            closure(&mut accessor).await
1424        }
1425    }
1426}
1427
1428impl StoreOpaque {
1429    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1430    /// guest-to-guest call or a sync host-to-guest call.
1431    ///
1432    /// This task will only be used for the purpose of handling calls to
1433    /// intrinsic functions; both parameter lowering and result lifting are
1434    /// assumed to be taken care of elsewhere.
1435    pub(crate) fn enter_guest_sync_call(
1436        &mut self,
1437        guest_caller: Option<RuntimeInstance>,
1438        callee_async: bool,
1439        callee: RuntimeInstance,
1440    ) -> Result<()> {
1441        log::trace!("enter sync call {callee:?}");
1442        if !self.concurrency_support() {
1443            return Ok(self.enter_call_not_concurrent());
1444        }
1445
1446        let state = self.concurrent_state_mut();
1447        let thread = state.current_thread;
1448        let instance = if let Some(thread) = thread.guest() {
1449            Some(state.get_mut(thread.task)?.instance)
1450        } else {
1451            None
1452        };
1453        let task = GuestTask::new(
1454            state,
1455            Box::new(move |_, _| unreachable!()),
1456            LiftResult {
1457                lift: Box::new(move |_, _| unreachable!()),
1458                ty: TypeTupleIndex::reserved_value(),
1459                memory: None,
1460                string_encoding: StringEncoding::Utf8,
1461            },
1462            if let Some(caller) = guest_caller {
1463                assert_eq!(caller, instance.unwrap());
1464                Caller::Guest {
1465                    thread: *thread.guest().unwrap(),
1466                }
1467            } else {
1468                Caller::Host {
1469                    tx: None,
1470                    host_future_present: false,
1471                    caller: thread,
1472                }
1473            },
1474            None,
1475            callee,
1476            callee_async,
1477        )?;
1478
1479        let guest_task = state.push(task)?;
1480        let new_thread = GuestThread::new_implicit(guest_task);
1481        let guest_thread = state.push(new_thread)?;
1482        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1483            guest_thread,
1484            self,
1485            callee.index,
1486        )?;
1487
1488        let state = self.concurrent_state_mut();
1489        state.get_mut(guest_task)?.threads.insert(guest_thread);
1490
1491        self.set_thread(QualifiedThreadId {
1492            task: guest_task,
1493            thread: guest_thread,
1494        });
1495
1496        Ok(())
1497    }
1498
1499    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1500    pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1501        if !self.concurrency_support() {
1502            return Ok(self.exit_call_not_concurrent());
1503        }
1504        let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1505        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1506        log::trace!("exit sync call {instance:?}");
1507        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1508            self,
1509            thread,
1510            instance.index,
1511        )?;
1512
1513        let state = self.concurrent_state_mut();
1514        let task = state.get_mut(thread.task)?;
1515        let caller = match &task.caller {
1516            &Caller::Guest { thread } => {
1517                assert!(guest_caller);
1518                thread.into()
1519            }
1520            &Caller::Host { caller, .. } => {
1521                assert!(!guest_caller);
1522                caller
1523            }
1524        };
1525        self.set_thread(caller);
1526
1527        let state = self.concurrent_state_mut();
1528        let task = state.get_mut(thread.task)?;
1529        if task.ready_to_delete() {
1530            state.delete(thread.task)?.dispose(state)?;
1531        }
1532
1533        Ok(())
1534    }
1535
1536    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1537    /// transition to the host.
1538    ///
1539    /// FIXME: this is called for all guest->host transitions and performs some
1540    /// relatively expensive table manipulations. This would ideally be
1541    /// optimized to avoid the full allocation of a `HostTask` in at least some
1542    /// situations.
1543    pub fn enter_host_call(&mut self) -> Result<()> {
1544        if !self.concurrency_support() {
1545            self.enter_call_not_concurrent();
1546            return Ok(());
1547        }
1548        let state = self.concurrent_state_mut();
1549        let caller = state.unwrap_current_guest_thread();
1550        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1551        log::trace!("new host task {task:?}");
1552        self.set_thread(task);
1553        Ok(())
1554    }
1555
1556    /// Dual of `enter_host_call` and signifies that the host has finished and
1557    /// will be cleaned up.
1558    ///
1559    /// Note that this isn't invoked when the host is invoked asynchronously and
1560    /// the host isn't complete yet. In that situation the host task persists
1561    /// and will be cleaned up separately.
1562    pub fn exit_host_call(&mut self) -> Result<()> {
1563        if !self.concurrency_support() {
1564            self.exit_call_not_concurrent();
1565            return Ok(());
1566        }
1567        let task = self.concurrent_state_mut().unwrap_current_host_thread();
1568        log::trace!("delete host task {task:?}");
1569        let task = self.concurrent_state_mut().delete(task)?;
1570        self.set_thread(task.caller);
1571        Ok(())
1572    }
1573
1574    /// Determine whether the specified instance may be entered from the host.
1575    ///
1576    /// We return `true` here only if all of the following hold:
1577    ///
1578    /// - The top-level instance is not already on the current task's call stack.
1579    /// - The instance is not in need of a post-return function call.
1580    /// - `self` has not been poisoned due to a trap.
1581    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1582        if self.trapped() {
1583            return false;
1584        }
1585        if !self.concurrency_support() {
1586            return true;
1587        }
1588        let state = self.concurrent_state_mut();
1589        let mut cur = state.current_thread;
1590        loop {
1591            match cur {
1592                CurrentThread::None => break true,
1593                CurrentThread::Guest(thread) => {
1594                    let task = state.get_mut(thread.task).unwrap();
1595
1596                    // Note that we only compare top-level instance IDs here.
1597                    // The idea is that the host is not allowed to recursively
1598                    // enter a top-level instance even if the specific leaf
1599                    // instance is not on the stack. This the behavior defined
1600                    // in the spec, and it allows us to elide runtime checks in
1601                    // guest-to-guest adapters.
1602                    if task.instance.instance == instance.instance {
1603                        break false;
1604                    }
1605                    cur = match task.caller {
1606                        Caller::Host { caller, .. } => caller,
1607                        Caller::Guest { thread } => thread.into(),
1608                    };
1609                }
1610                CurrentThread::Host(id) => {
1611                    cur = state.get_mut(id).unwrap().caller.into();
1612                }
1613            }
1614        }
1615    }
1616
1617    /// Helper function to retrieve the `InstanceState` for the
1618    /// specified instance.
1619    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1620        self.component_instance_mut(instance.instance)
1621            .instance_state(instance.index)
1622    }
1623
1624    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1625        // Each time we switch threads, we conservatively set `task_may_block`
1626        // to `false` for the component instance we're switching away from (if
1627        // any), meaning it will be `false` for any new thread created for that
1628        // instance unless explicitly set otherwise.
1629        let state = self.concurrent_state_mut();
1630        let old_thread = mem::replace(&mut state.current_thread, thread.into());
1631        if let Some(old_thread) = old_thread.guest() {
1632            let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1633            self.component_instance_mut(instance)
1634                .set_task_may_block(false)
1635        }
1636
1637        // If we're switching to a new thread, set its component instance's
1638        // `task_may_block` according to where it left off.
1639        if self.concurrent_state_mut().current_thread.guest().is_some() {
1640            self.set_task_may_block();
1641        }
1642
1643        old_thread
1644    }
1645
1646    /// Set the global variable representing whether the current task may block
1647    /// prior to entering Wasm code.
1648    fn set_task_may_block(&mut self) {
1649        let state = self.concurrent_state_mut();
1650        let guest_thread = state.unwrap_current_guest_thread();
1651        let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1652        let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1653        self.component_instance_mut(instance)
1654            .set_task_may_block(may_block)
1655    }
1656
1657    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1658        if !self.concurrency_support() {
1659            return Ok(());
1660        }
1661        let state = self.concurrent_state_mut();
1662        let task = state.unwrap_current_guest_thread().task;
1663        let instance = state.get_mut(task).unwrap().instance.instance;
1664        let task_may_block = self.component_instance(instance).get_task_may_block();
1665
1666        if task_may_block {
1667            Ok(())
1668        } else {
1669            Err(Trap::CannotBlockSyncTask.into())
1670        }
1671    }
1672
1673    /// Record that we're about to enter a (sub-)component instance which does
1674    /// not support more than one concurrent, stackful activation, meaning it
1675    /// cannot be entered again until the next call returns.
1676    fn enter_instance(&mut self, instance: RuntimeInstance) {
1677        log::trace!("enter {instance:?}");
1678        self.instance_state(instance)
1679            .concurrent_state()
1680            .do_not_enter = true;
1681    }
1682
1683    /// Record that we've exited a (sub-)component instance previously entered
1684    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1685    /// See the documentation for the latter for details.
1686    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1687        log::trace!("exit {instance:?}");
1688        self.instance_state(instance)
1689            .concurrent_state()
1690            .do_not_enter = false;
1691        self.partition_pending(instance)
1692    }
1693
1694    /// Iterate over `InstanceState::pending`, moving any ready items into the
1695    /// "high priority" work item queue.
1696    ///
1697    /// See `GuestCall::is_ready` for details.
1698    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1699        for (thread, kind) in
1700            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1701        {
1702            let call = GuestCall { thread, kind };
1703            if call.is_ready(self)? {
1704                self.concurrent_state_mut()
1705                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1706            } else {
1707                self.instance_state(instance)
1708                    .concurrent_state()
1709                    .pending
1710                    .insert(call.thread, call.kind);
1711            }
1712        }
1713
1714        Ok(())
1715    }
1716
1717    /// Implements the `backpressure.{inc,dec}` intrinsics.
1718    pub(crate) fn backpressure_modify(
1719        &mut self,
1720        caller_instance: RuntimeInstance,
1721        modify: impl FnOnce(u16) -> Option<u16>,
1722    ) -> Result<()> {
1723        let state = self.instance_state(caller_instance).concurrent_state();
1724        let old = state.backpressure;
1725        let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1726        state.backpressure = new;
1727
1728        if old > 0 && new == 0 {
1729            // Backpressure was previously enabled and is now disabled; move any
1730            // newly-eligible guest calls to the "high priority" queue.
1731            self.partition_pending(caller_instance)?;
1732        }
1733
1734        Ok(())
1735    }
1736
1737    /// Resume the specified fiber, giving it exclusive access to the specified
1738    /// store.
1739    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1740        let old_thread = self.concurrent_state_mut().current_thread;
1741        log::trace!("resume_fiber: save current thread {old_thread:?}");
1742
1743        let fiber = fiber::resolve_or_release(self, fiber).await?;
1744
1745        self.set_thread(old_thread);
1746
1747        let state = self.concurrent_state_mut();
1748
1749        if let Some(ot) = old_thread.guest() {
1750            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1751        }
1752        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1753
1754        if let Some(mut fiber) = fiber {
1755            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1756            // See the `SuspendReason` documentation for what each case means.
1757            match state.suspend_reason.take().unwrap() {
1758                SuspendReason::NeedWork => {
1759                    if state.worker.is_none() {
1760                        state.worker = Some(fiber);
1761                    } else {
1762                        fiber.dispose(self);
1763                    }
1764                }
1765                SuspendReason::Yielding { thread, .. } => {
1766                    state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1767                    let instance = state.get_mut(thread.task)?.instance.index;
1768                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1769                }
1770                SuspendReason::ExplicitlySuspending { thread, .. } => {
1771                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1772                }
1773                SuspendReason::Waiting { set, thread, .. } => {
1774                    let old = state
1775                        .get_mut(set)?
1776                        .waiting
1777                        .insert(thread, WaitMode::Fiber(fiber));
1778                    assert!(old.is_none());
1779                }
1780            };
1781        } else {
1782            log::trace!("resume_fiber: fiber has exited");
1783        }
1784
1785        Ok(())
1786    }
1787
1788    /// Suspend the current fiber, storing the reason in
1789    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1790    /// it should be resumed.
1791    ///
1792    /// See the `SuspendReason` documentation for details.
1793    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1794        log::trace!("suspend fiber: {reason:?}");
1795
1796        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1797        // pop the call context which manages resource borrows before suspending
1798        // and then push it again once we've resumed.
1799        let task = match &reason {
1800            SuspendReason::Yielding { thread, .. }
1801            | SuspendReason::Waiting { thread, .. }
1802            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1803            SuspendReason::NeedWork => None,
1804        };
1805
1806        let old_guest_thread = if task.is_some() {
1807            self.concurrent_state_mut().current_thread
1808        } else {
1809            CurrentThread::None
1810        };
1811
1812        // We should not have reached here unless either there's no current
1813        // task, or the current task is permitted to block.  In addition, we
1814        // special-case `thread.switch-to` and waiting for a subtask to go from
1815        // `starting` to `started`, both of which we consider non-blocking
1816        // operations despite requiring a suspend.
1817        assert!(
1818            matches!(
1819                reason,
1820                SuspendReason::ExplicitlySuspending {
1821                    skip_may_block_check: true,
1822                    ..
1823                } | SuspendReason::Waiting {
1824                    skip_may_block_check: true,
1825                    ..
1826                } | SuspendReason::Yielding {
1827                    skip_may_block_check: true,
1828                    ..
1829                }
1830            ) || old_guest_thread
1831                .guest()
1832                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1833                .unwrap_or(true)
1834        );
1835
1836        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1837        assert!(suspend_reason.is_none());
1838        *suspend_reason = Some(reason);
1839
1840        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1841
1842        if task.is_some() {
1843            self.set_thread(old_guest_thread);
1844        }
1845
1846        Ok(())
1847    }
1848
1849    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1850        let state = self.concurrent_state_mut();
1851        let caller = state.unwrap_current_guest_thread();
1852        let old_set = waitable.common(state)?.set;
1853        let set = state.get_mut(caller.task)?.sync_call_set;
1854        waitable.join(state, Some(set))?;
1855        self.suspend(SuspendReason::Waiting {
1856            set,
1857            thread: caller,
1858            skip_may_block_check: false,
1859        })?;
1860        let state = self.concurrent_state_mut();
1861        waitable.join(state, old_set)
1862    }
1863}
1864
1865impl Instance {
1866    /// Get the next pending event for the specified task and (optional)
1867    /// waitable set, along with the waitable handle if applicable.
1868    fn get_event(
1869        self,
1870        store: &mut StoreOpaque,
1871        guest_task: TableId<GuestTask>,
1872        set: Option<TableId<WaitableSet>>,
1873        cancellable: bool,
1874    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1875        let state = store.concurrent_state_mut();
1876
1877        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1878            log::trace!("deliver event {event:?} to {guest_task:?}");
1879
1880            if cancellable || !matches!(event, Event::Cancelled) {
1881                return Ok(Some((event, None)));
1882            } else {
1883                state.get_mut(guest_task)?.event = Some(event);
1884            }
1885        }
1886
1887        Ok(
1888            if let Some((set, waitable)) = set
1889                .and_then(|set| {
1890                    state
1891                        .get_mut(set)
1892                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1893                        .transpose()
1894                })
1895                .transpose()?
1896            {
1897                let common = waitable.common(state)?;
1898                let handle = common.handle.unwrap();
1899                let event = common.event.take().unwrap();
1900
1901                log::trace!(
1902                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1903                );
1904
1905                waitable.on_delivery(store, self, event);
1906
1907                Some((event, Some((waitable, handle))))
1908            } else {
1909                None
1910            },
1911        )
1912    }
1913
1914    /// Handle the `CallbackCode` returned from an async-lifted export or its
1915    /// callback.
1916    ///
1917    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1918    /// using `handle_guest_call`.
1919    fn handle_callback_code(
1920        self,
1921        store: &mut StoreOpaque,
1922        guest_thread: QualifiedThreadId,
1923        runtime_instance: RuntimeComponentInstanceIndex,
1924        code: u32,
1925    ) -> Result<Option<GuestCall>> {
1926        let (code, set) = unpack_callback_code(code);
1927
1928        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1929
1930        let state = store.concurrent_state_mut();
1931
1932        let get_set = |store: &mut StoreOpaque, handle| {
1933            if handle == 0 {
1934                bail!("invalid waitable-set handle");
1935            }
1936
1937            let set = store
1938                .instance_state(RuntimeInstance {
1939                    instance: self.id().instance(),
1940                    index: runtime_instance,
1941                })
1942                .handle_table()
1943                .waitable_set_rep(handle)?;
1944
1945            Ok(TableId::<WaitableSet>::new(set))
1946        };
1947
1948        Ok(match code {
1949            callback_code::EXIT => {
1950                log::trace!("implicit thread {guest_thread:?} completed");
1951                self.cleanup_thread(store, guest_thread, runtime_instance)?;
1952                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1953                if task.threads.is_empty() && !task.returned_or_cancelled() {
1954                    bail!(Trap::NoAsyncResult);
1955                }
1956                if let Caller::Guest { .. } = task.caller {
1957                    task.exited = true;
1958                    task.callback = None;
1959                }
1960                if task.ready_to_delete() {
1961                    Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1962                }
1963                None
1964            }
1965            callback_code::YIELD => {
1966                let task = state.get_mut(guest_thread.task)?;
1967                // If an `Event::Cancelled` is pending, we'll deliver that;
1968                // otherwise, we'll deliver `Event::None`.  Note that
1969                // `GuestTask::event` is only ever set to one of those two
1970                // `Event` variants.
1971                if let Some(event) = task.event {
1972                    assert!(matches!(event, Event::None | Event::Cancelled));
1973                } else {
1974                    task.event = Some(Event::None);
1975                }
1976                let call = GuestCall {
1977                    thread: guest_thread,
1978                    kind: GuestCallKind::DeliverEvent {
1979                        instance: self,
1980                        set: None,
1981                    },
1982                };
1983                if state.may_block(guest_thread.task) {
1984                    // Push this thread onto the "low priority" queue so it runs
1985                    // after any other threads have had a chance to run.
1986                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1987                    None
1988                } else {
1989                    // Yielding in a non-blocking context is defined as a no-op
1990                    // according to the spec, so we must run this thread
1991                    // immediately without allowing any others to run.
1992                    Some(call)
1993                }
1994            }
1995            callback_code::WAIT => {
1996                // The task may only return `WAIT` if it was created for a call
1997                // to an async export).  Otherwise, we'll trap.
1998                state.check_blocking_for(guest_thread.task)?;
1999
2000                let set = get_set(store, set)?;
2001                let state = store.concurrent_state_mut();
2002
2003                if state.get_mut(guest_thread.task)?.event.is_some()
2004                    || !state.get_mut(set)?.ready.is_empty()
2005                {
2006                    // An event is immediately available; deliver it ASAP.
2007                    state.push_high_priority(WorkItem::GuestCall(
2008                        runtime_instance,
2009                        GuestCall {
2010                            thread: guest_thread,
2011                            kind: GuestCallKind::DeliverEvent {
2012                                instance: self,
2013                                set: Some(set),
2014                            },
2015                        },
2016                    ));
2017                } else {
2018                    // No event is immediately available.
2019                    //
2020                    // We're waiting, so register to be woken up when an event
2021                    // is published for this waitable set.
2022                    //
2023                    // Here we also set `GuestTask::wake_on_cancel` which allows
2024                    // `subtask.cancel` to interrupt the wait.
2025                    let old = state
2026                        .get_mut(guest_thread.thread)?
2027                        .wake_on_cancel
2028                        .replace(set);
2029                    assert!(old.is_none());
2030                    let old = state
2031                        .get_mut(set)?
2032                        .waiting
2033                        .insert(guest_thread, WaitMode::Callback(self));
2034                    assert!(old.is_none());
2035                }
2036                None
2037            }
2038            _ => bail!("unsupported callback code: {code}"),
2039        })
2040    }
2041
2042    fn cleanup_thread(
2043        self,
2044        store: &mut StoreOpaque,
2045        guest_thread: QualifiedThreadId,
2046        runtime_instance: RuntimeComponentInstanceIndex,
2047    ) -> Result<()> {
2048        let guest_id = store
2049            .concurrent_state_mut()
2050            .get_mut(guest_thread.thread)?
2051            .instance_rep;
2052        store
2053            .instance_state(RuntimeInstance {
2054                instance: self.id().instance(),
2055                index: runtime_instance,
2056            })
2057            .thread_handle_table()
2058            .guest_thread_remove(guest_id.unwrap())?;
2059
2060        store.concurrent_state_mut().delete(guest_thread.thread)?;
2061        let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2062        task.threads.remove(&guest_thread.thread);
2063        Ok(())
2064    }
2065
2066    /// Add the specified guest call to the "high priority" work item queue, to
2067    /// be started as soon as backpressure and/or reentrance rules allow.
2068    ///
2069    /// SAFETY: The raw pointer arguments must be valid references to guest
2070    /// functions (with the appropriate signatures) when the closures queued by
2071    /// this function are called.
2072    unsafe fn queue_call<T: 'static>(
2073        self,
2074        mut store: StoreContextMut<T>,
2075        guest_thread: QualifiedThreadId,
2076        callee: SendSyncPtr<VMFuncRef>,
2077        param_count: usize,
2078        result_count: usize,
2079        async_: bool,
2080        callback: Option<SendSyncPtr<VMFuncRef>>,
2081        post_return: Option<SendSyncPtr<VMFuncRef>>,
2082    ) -> Result<()> {
2083        /// Return a closure which will call the specified function in the scope
2084        /// of the specified task.
2085        ///
2086        /// This will use `GuestTask::lower_params` to lower the parameters, but
2087        /// will not lift the result; instead, it returns a
2088        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2089        /// any, may be lifted.  Note that an async-lifted export will have
2090        /// returned its result using the `task.return` intrinsic (or not
2091        /// returned a result at all, in the case of `task.cancel`), in which
2092        /// case the "result" of this call will either be a callback code or
2093        /// nothing.
2094        ///
2095        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2096        /// the returned closure is called.
2097        unsafe fn make_call<T: 'static>(
2098            store: StoreContextMut<T>,
2099            guest_thread: QualifiedThreadId,
2100            callee: SendSyncPtr<VMFuncRef>,
2101            param_count: usize,
2102            result_count: usize,
2103        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2104        + Send
2105        + Sync
2106        + 'static
2107        + use<T> {
2108            let token = StoreToken::new(store);
2109            move |store: &mut dyn VMStore| {
2110                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2111
2112                store
2113                    .concurrent_state_mut()
2114                    .get_mut(guest_thread.thread)?
2115                    .state = GuestThreadState::Running;
2116                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2117                let lower = task.lower_params.take().unwrap();
2118
2119                lower(store, &mut storage[..param_count])?;
2120
2121                let mut store = token.as_context_mut(store);
2122
2123                // SAFETY: Per the contract documented in `make_call's`
2124                // documentation, `callee` must be a valid pointer.
2125                unsafe {
2126                    crate::Func::call_unchecked_raw(
2127                        &mut store,
2128                        callee.as_non_null(),
2129                        NonNull::new(
2130                            &mut storage[..param_count.max(result_count)]
2131                                as *mut [MaybeUninit<ValRaw>] as _,
2132                        )
2133                        .unwrap(),
2134                    )?;
2135                }
2136
2137                Ok(storage)
2138            }
2139        }
2140
2141        // SAFETY: Per the contract described in this function documentation,
2142        // the `callee` pointer which `call` closes over must be valid when
2143        // called by the closure we queue below.
2144        let call = unsafe {
2145            make_call(
2146                store.as_context_mut(),
2147                guest_thread,
2148                callee,
2149                param_count,
2150                result_count,
2151            )
2152        };
2153
2154        let callee_instance = store
2155            .0
2156            .concurrent_state_mut()
2157            .get_mut(guest_thread.task)?
2158            .instance;
2159
2160        let fun = if callback.is_some() {
2161            assert!(async_);
2162
2163            Box::new(move |store: &mut dyn VMStore| {
2164                self.add_guest_thread_to_instance_table(
2165                    guest_thread.thread,
2166                    store,
2167                    callee_instance.index,
2168                )?;
2169                let old_thread = store.set_thread(guest_thread);
2170                log::trace!(
2171                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2172                );
2173
2174                store.enter_instance(callee_instance);
2175
2176                // SAFETY: See the documentation for `make_call` to review the
2177                // contract we must uphold for `call` here.
2178                //
2179                // Per the contract described in the `queue_call`
2180                // documentation, the `callee` pointer which `call` closes
2181                // over must be valid.
2182                let storage = call(store)?;
2183
2184                store.exit_instance(callee_instance)?;
2185
2186                store.set_thread(old_thread);
2187                let state = store.concurrent_state_mut();
2188                old_thread
2189                    .guest()
2190                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2191                log::trace!("stackless call: restored {old_thread:?} as current thread");
2192
2193                // SAFETY: `wasmparser` will have validated that the callback
2194                // function returns a `i32` result.
2195                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2196
2197                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2198            })
2199                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2200        } else {
2201            let token = StoreToken::new(store.as_context_mut());
2202            Box::new(move |store: &mut dyn VMStore| {
2203                self.add_guest_thread_to_instance_table(
2204                    guest_thread.thread,
2205                    store,
2206                    callee_instance.index,
2207                )?;
2208                let old_thread = store.set_thread(guest_thread);
2209                log::trace!(
2210                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2211                );
2212                let flags = self.id().get(store).instance_flags(callee_instance.index);
2213
2214                // Unless this is a callback-less (i.e. stackful)
2215                // async-lifted export, we need to record that the instance
2216                // cannot be entered until the call returns.
2217                if !async_ {
2218                    store.enter_instance(callee_instance);
2219                }
2220
2221                // SAFETY: See the documentation for `make_call` to review the
2222                // contract we must uphold for `call` here.
2223                //
2224                // Per the contract described in the `queue_call`
2225                // documentation, the `callee` pointer which `call` closes
2226                // over must be valid.
2227                let storage = call(store)?;
2228
2229                if async_ {
2230                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2231                    if task.threads.len() == 1 && !task.returned_or_cancelled() {
2232                        bail!(Trap::NoAsyncResult);
2233                    }
2234                } else {
2235                    // This is a sync-lifted export, so now is when we lift the
2236                    // result, optionally call the post-return function, if any,
2237                    // and finally notify any current or future waiters that the
2238                    // subtask has returned.
2239
2240                    let lift = {
2241                        store.exit_instance(callee_instance)?;
2242
2243                        let state = store.concurrent_state_mut();
2244                        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2245
2246                        state
2247                            .get_mut(guest_thread.task)?
2248                            .lift_result
2249                            .take()
2250                            .unwrap()
2251                    };
2252
2253                    // SAFETY: `result_count` represents the number of core Wasm
2254                    // results returned, per `wasmparser`.
2255                    let result = (lift.lift)(store, unsafe {
2256                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2257                            &storage[..result_count],
2258                        )
2259                    })?;
2260
2261                    let post_return_arg = match result_count {
2262                        0 => ValRaw::i32(0),
2263                        // SAFETY: `result_count` represents the number of
2264                        // core Wasm results returned, per `wasmparser`.
2265                        1 => unsafe { storage[0].assume_init() },
2266                        _ => unreachable!(),
2267                    };
2268
2269                    unsafe {
2270                        call_post_return(
2271                            token.as_context_mut(store),
2272                            post_return.map(|v| v.as_non_null()),
2273                            post_return_arg,
2274                            flags,
2275                        )?;
2276                    }
2277
2278                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2279                }
2280
2281                // This is a callback-less call, so the implicit thread has now completed
2282                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2283
2284                store.set_thread(old_thread);
2285
2286                let state = store.concurrent_state_mut();
2287                let task = state.get_mut(guest_thread.task)?;
2288
2289                match &task.caller {
2290                    Caller::Host { .. } => {
2291                        if task.ready_to_delete() {
2292                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2293                        }
2294                    }
2295                    Caller::Guest { .. } => {
2296                        task.exited = true;
2297                    }
2298                }
2299
2300                Ok(None)
2301            })
2302        };
2303
2304        store
2305            .0
2306            .concurrent_state_mut()
2307            .push_high_priority(WorkItem::GuestCall(
2308                callee_instance.index,
2309                GuestCall {
2310                    thread: guest_thread,
2311                    kind: GuestCallKind::StartImplicit(fun),
2312                },
2313            ));
2314
2315        Ok(())
2316    }
2317
2318    /// Prepare (but do not start) a guest->guest call.
2319    ///
2320    /// This is called from fused adapter code generated in
2321    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2322    /// are synthesized Wasm functions which move the parameters from the caller
2323    /// to the callee and the result from the callee to the caller,
2324    /// respectively.  The adapter will call `Self::start_call` immediately
2325    /// after calling this function.
2326    ///
2327    /// SAFETY: All the pointer arguments must be valid pointers to guest
2328    /// entities (and with the expected signatures for the function references
2329    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2330    unsafe fn prepare_call<T: 'static>(
2331        self,
2332        mut store: StoreContextMut<T>,
2333        start: *mut VMFuncRef,
2334        return_: *mut VMFuncRef,
2335        caller_instance: RuntimeComponentInstanceIndex,
2336        callee_instance: RuntimeComponentInstanceIndex,
2337        task_return_type: TypeTupleIndex,
2338        callee_async: bool,
2339        memory: *mut VMMemoryDefinition,
2340        string_encoding: u8,
2341        caller_info: CallerInfo,
2342    ) -> Result<()> {
2343        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2344            // A task may only call an async-typed function via a sync lower if
2345            // it was created by a call to an async export.  Otherwise, we'll
2346            // trap.
2347            store.0.check_blocking()?;
2348        }
2349
2350        enum ResultInfo {
2351            Heap { results: u32 },
2352            Stack { result_count: u32 },
2353        }
2354
2355        let result_info = match &caller_info {
2356            CallerInfo::Async {
2357                has_result: true,
2358                params,
2359            } => ResultInfo::Heap {
2360                results: params.last().unwrap().get_u32(),
2361            },
2362            CallerInfo::Async {
2363                has_result: false, ..
2364            } => ResultInfo::Stack { result_count: 0 },
2365            CallerInfo::Sync {
2366                result_count,
2367                params,
2368            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2369                results: params.last().unwrap().get_u32(),
2370            },
2371            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2372                result_count: *result_count,
2373            },
2374        };
2375
2376        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2377
2378        // Create a new guest task for the call, closing over the `start` and
2379        // `return_` functions to lift the parameters and lower the result,
2380        // respectively.
2381        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2382        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2383        let token = StoreToken::new(store.as_context_mut());
2384        let state = store.0.concurrent_state_mut();
2385        let old_thread = state.unwrap_current_guest_thread();
2386
2387        assert_eq!(
2388            state.get_mut(old_thread.task)?.instance,
2389            RuntimeInstance {
2390                instance: self.id().instance(),
2391                index: caller_instance,
2392            }
2393        );
2394
2395        let new_task = GuestTask::new(
2396            state,
2397            Box::new(move |store, dst| {
2398                let mut store = token.as_context_mut(store);
2399                assert!(dst.len() <= MAX_FLAT_PARAMS);
2400                // The `+ 1` here accounts for the return pointer, if any:
2401                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2402                let count = match caller_info {
2403                    // Async callers, if they have a result, use the last
2404                    // parameter as a return pointer so chop that off if
2405                    // relevant here.
2406                    CallerInfo::Async { params, has_result } => {
2407                        let params = &params[..params.len() - usize::from(has_result)];
2408                        for (param, src) in params.iter().zip(&mut src) {
2409                            src.write(*param);
2410                        }
2411                        params.len()
2412                    }
2413
2414                    // Sync callers forward everything directly.
2415                    CallerInfo::Sync { params, .. } => {
2416                        for (param, src) in params.iter().zip(&mut src) {
2417                            src.write(*param);
2418                        }
2419                        params.len()
2420                    }
2421                };
2422                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2423                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2424                // how it was constructed (see
2425                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2426                // for details) we know it takes count parameters and returns
2427                // `dst.len()` results.
2428                unsafe {
2429                    crate::Func::call_unchecked_raw(
2430                        &mut store,
2431                        start.as_non_null(),
2432                        NonNull::new(
2433                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2434                        )
2435                        .unwrap(),
2436                    )?;
2437                }
2438                dst.copy_from_slice(&src[..dst.len()]);
2439                let state = store.0.concurrent_state_mut();
2440                Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2441                    state,
2442                    Some(Event::Subtask {
2443                        status: Status::Started,
2444                    }),
2445                )?;
2446                Ok(())
2447            }),
2448            LiftResult {
2449                lift: Box::new(move |store, src| {
2450                    // SAFETY: See comment in closure passed as `lower_params`
2451                    // parameter above.
2452                    let mut store = token.as_context_mut(store);
2453                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2454                    if let ResultInfo::Heap { results } = &result_info {
2455                        my_src.push(ValRaw::u32(*results));
2456                    }
2457                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2458                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2459                    // on how it was constructed (see
2460                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2461                    // for details) we know it takes `src.len()` parameters and
2462                    // returns up to 1 result.
2463                    unsafe {
2464                        crate::Func::call_unchecked_raw(
2465                            &mut store,
2466                            return_.as_non_null(),
2467                            my_src.as_mut_slice().into(),
2468                        )?;
2469                    }
2470                    let state = store.0.concurrent_state_mut();
2471                    let thread = state.unwrap_current_guest_thread();
2472                    if sync_caller {
2473                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2474                            if let ResultInfo::Stack { result_count } = &result_info {
2475                                match result_count {
2476                                    0 => None,
2477                                    1 => Some(my_src[0]),
2478                                    _ => unreachable!(),
2479                                }
2480                            } else {
2481                                None
2482                            },
2483                        );
2484                    }
2485                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2486                }),
2487                ty: task_return_type,
2488                memory: NonNull::new(memory).map(SendSyncPtr::new),
2489                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2490            },
2491            Caller::Guest { thread: old_thread },
2492            None,
2493            RuntimeInstance {
2494                instance: self.id().instance(),
2495                index: callee_instance,
2496            },
2497            callee_async,
2498        )?;
2499
2500        let guest_task = state.push(new_task)?;
2501        let new_thread = GuestThread::new_implicit(guest_task);
2502        let guest_thread = state.push(new_thread)?;
2503        state.get_mut(guest_task)?.threads.insert(guest_thread);
2504
2505        // Make the new thread the current one so that `Self::start_call` knows
2506        // which one to start.
2507        store.0.set_thread(QualifiedThreadId {
2508            task: guest_task,
2509            thread: guest_thread,
2510        });
2511        log::trace!(
2512            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2513        );
2514
2515        Ok(())
2516    }
2517
2518    /// Call the specified callback function for an async-lifted export.
2519    ///
2520    /// SAFETY: `function` must be a valid reference to a guest function of the
2521    /// correct signature for a callback.
2522    unsafe fn call_callback<T>(
2523        self,
2524        mut store: StoreContextMut<T>,
2525        function: SendSyncPtr<VMFuncRef>,
2526        event: Event,
2527        handle: u32,
2528    ) -> Result<u32> {
2529        let (ordinal, result) = event.parts();
2530        let params = &mut [
2531            ValRaw::u32(ordinal),
2532            ValRaw::u32(handle),
2533            ValRaw::u32(result),
2534        ];
2535        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2536        // `wasmtime-cranelift`-generated fused adapter code or
2537        // `component::Options`.  Per `wasmparser` callback signature
2538        // validation, we know it takes three parameters and returns one.
2539        unsafe {
2540            crate::Func::call_unchecked_raw(
2541                &mut store,
2542                function.as_non_null(),
2543                params.as_mut_slice().into(),
2544            )?;
2545        }
2546        Ok(params[0].get_u32())
2547    }
2548
2549    /// Start a guest->guest call previously prepared using
2550    /// `Self::prepare_call`.
2551    ///
2552    /// This is called from fused adapter code generated in
2553    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2554    /// this function immediately after calling `Self::prepare_call`.
2555    ///
2556    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2557    /// functions with the appropriate signatures for the current guest task.
2558    /// If this is a call to an async-lowered import, the actual call may be
2559    /// deferred and run after this function returns, in which case the pointer
2560    /// arguments must also be valid when the call happens.
2561    unsafe fn start_call<T: 'static>(
2562        self,
2563        mut store: StoreContextMut<T>,
2564        callback: *mut VMFuncRef,
2565        post_return: *mut VMFuncRef,
2566        callee: *mut VMFuncRef,
2567        param_count: u32,
2568        result_count: u32,
2569        flags: u32,
2570        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2571    ) -> Result<u32> {
2572        let token = StoreToken::new(store.as_context_mut());
2573        let async_caller = storage.is_none();
2574        let state = store.0.concurrent_state_mut();
2575        let guest_thread = state.unwrap_current_guest_thread();
2576        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2577        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2578        let param_count = usize::try_from(param_count).unwrap();
2579        assert!(param_count <= MAX_FLAT_PARAMS);
2580        let result_count = usize::try_from(result_count).unwrap();
2581        assert!(result_count <= MAX_FLAT_RESULTS);
2582
2583        let task = state.get_mut(guest_thread.task)?;
2584        if !callback.is_null() {
2585            // We're calling an async-lifted export with a callback, so store
2586            // the callback and related context as part of the task so we can
2587            // call it later when needed.
2588            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2589            task.callback = Some(Box::new(move |store, event, handle| {
2590                let store = token.as_context_mut(store);
2591                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2592            }));
2593        }
2594
2595        let Caller::Guest { thread: caller } = &task.caller else {
2596            // As of this writing, `start_call` is only used for guest->guest
2597            // calls.
2598            unreachable!()
2599        };
2600        let caller = *caller;
2601        let caller_instance = state.get_mut(caller.task)?.instance;
2602
2603        // Queue the call as a "high priority" work item.
2604        unsafe {
2605            self.queue_call(
2606                store.as_context_mut(),
2607                guest_thread,
2608                callee,
2609                param_count,
2610                result_count,
2611                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2612                NonNull::new(callback).map(SendSyncPtr::new),
2613                NonNull::new(post_return).map(SendSyncPtr::new),
2614            )?;
2615        }
2616
2617        let state = store.0.concurrent_state_mut();
2618
2619        // Use the caller's `GuestTask::sync_call_set` to register interest in
2620        // the subtask...
2621        let guest_waitable = Waitable::Guest(guest_thread.task);
2622        let old_set = guest_waitable.common(state)?.set;
2623        let set = state.get_mut(caller.task)?.sync_call_set;
2624        guest_waitable.join(state, Some(set))?;
2625
2626        // ... and suspend this fiber temporarily while we wait for it to start.
2627        //
2628        // Note that we _could_ call the callee directly using the current fiber
2629        // rather than suspend this one, but that would make reasoning about the
2630        // event loop more complicated and is probably only worth doing if
2631        // there's a measurable performance benefit.  In addition, it would mean
2632        // blocking the caller if the callee calls a blocking sync-lowered
2633        // import, and as of this writing the spec says we must not do that.
2634        //
2635        // Alternatively, the fused adapter code could be modified to call the
2636        // callee directly without calling a host-provided intrinsic at all (in
2637        // which case it would need to do its own, inline backpressure checks,
2638        // etc.).  Again, we'd want to see a measurable performance benefit
2639        // before committing to such an optimization.  And again, we'd need to
2640        // update the spec to allow that.
2641        let (status, waitable) = loop {
2642            store.0.suspend(SuspendReason::Waiting {
2643                set,
2644                thread: caller,
2645                // Normally, `StoreOpaque::suspend` would assert it's being
2646                // called from a context where blocking is allowed.  However, if
2647                // `async_caller` is `true`, we'll only "block" long enough for
2648                // the callee to start, i.e. we won't repeat this loop, so we
2649                // tell `suspend` it's okay even if we're not allowed to block.
2650                // Alternatively, if the callee is not an async function, then
2651                // we know it won't block anyway.
2652                skip_may_block_check: async_caller || !callee_async,
2653            })?;
2654
2655            let state = store.0.concurrent_state_mut();
2656
2657            log::trace!("taking event for {:?}", guest_thread.task);
2658            let event = guest_waitable.take_event(state)?;
2659            let Some(Event::Subtask { status }) = event else {
2660                unreachable!();
2661            };
2662
2663            log::trace!("status {status:?} for {:?}", guest_thread.task);
2664
2665            if status == Status::Returned {
2666                // It returned, so we can stop waiting.
2667                break (status, None);
2668            } else if async_caller {
2669                // It hasn't returned yet, but the caller is calling via an
2670                // async-lowered import, so we generate a handle for the task
2671                // waitable and return the status.
2672                let handle = store
2673                    .0
2674                    .instance_state(caller_instance)
2675                    .handle_table()
2676                    .subtask_insert_guest(guest_thread.task.rep())?;
2677                store
2678                    .0
2679                    .concurrent_state_mut()
2680                    .get_mut(guest_thread.task)?
2681                    .common
2682                    .handle = Some(handle);
2683                break (status, Some(handle));
2684            } else {
2685                // The callee hasn't returned yet, and the caller is calling via
2686                // a sync-lowered import, so we loop and keep waiting until the
2687                // callee returns.
2688            }
2689        };
2690
2691        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2692
2693        // Reset the current thread to point to the caller as it resumes control.
2694        store.0.set_thread(caller);
2695        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2696        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2697
2698        if let Some(storage) = storage {
2699            // The caller used a sync-lowered import to call an async-lifted
2700            // export, in which case the result, if any, has been stashed in
2701            // `GuestTask::sync_result`.
2702            let state = store.0.concurrent_state_mut();
2703            let task = state.get_mut(guest_thread.task)?;
2704            if let Some(result) = task.sync_result.take() {
2705                if let Some(result) = result {
2706                    storage[0] = MaybeUninit::new(result);
2707                }
2708
2709                if task.exited && task.ready_to_delete() {
2710                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2711                }
2712            }
2713        }
2714
2715        Ok(status.pack(waitable))
2716    }
2717
2718    /// Poll the specified future once on behalf of a guest->host call using an
2719    /// async-lowered import.
2720    ///
2721    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2722    /// `Pending`, add it to the set of futures to be polled as part of this
2723    /// instance's event loop until it completes, and then return
2724    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2725    ///
2726    /// Whether the future returns `Ready` immediately or later, the `lower`
2727    /// function will be used to lower the result, if any, into the guest caller's
2728    /// stack and linear memory. The `lower` function is invoked with `None` if
2729    /// the future is cancelled.
2730    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2731        self,
2732        mut store: StoreContextMut<'_, T>,
2733        future: impl Future<Output = Result<R>> + Send + 'static,
2734        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2735    ) -> Result<Option<u32>> {
2736        let token = StoreToken::new(store.as_context_mut());
2737        let state = store.0.concurrent_state_mut();
2738        let task = state.unwrap_current_host_thread();
2739
2740        // Create an abortable future which hooks calls to poll and manages call
2741        // context state for the future.
2742        let (join_handle, future) = JoinHandle::run(future);
2743        {
2744            let state = &mut state.get_mut(task)?.state;
2745            assert!(matches!(state, HostTaskState::CalleeStarted));
2746            *state = HostTaskState::CalleeRunning(join_handle);
2747        }
2748
2749        let mut future = Box::pin(future);
2750
2751        // Finally, poll the future.  We can use a dummy `Waker` here because
2752        // we'll add the future to `ConcurrentState::futures` and poll it
2753        // automatically from the event loop if it doesn't complete immediately
2754        // here.
2755        let poll = tls::set(store.0, || {
2756            future
2757                .as_mut()
2758                .poll(&mut Context::from_waker(&Waker::noop()))
2759        });
2760
2761        match poll {
2762            // It finished immediately; lower the result and delete the task.
2763            Poll::Ready(Some(result)) => {
2764                lower(store.as_context_mut(), Some(result?))?;
2765                return Ok(None);
2766            }
2767
2768            // Shouldn't be possible since the future isn't cancelled via the
2769            // `join_handle`.
2770            Poll::Ready(None) => unreachable!(),
2771
2772            // Future isn't ready yet, so fall through.
2773            Poll::Pending => {}
2774        }
2775
2776        // It hasn't finished yet; add the future to
2777        // `ConcurrentState::futures` so it will be polled by the event
2778        // loop and allocate a waitable handle to return to the guest.
2779
2780        // Wrap the future in a closure responsible for lowering the result into
2781        // the guest's stack and memory, as well as notifying any waiters that
2782        // the task returned.
2783        let future = Box::pin(async move {
2784            let result = match future.await {
2785                Some(result) => Some(result?),
2786                None => None,
2787            };
2788            let on_complete = move |store: &mut dyn VMStore| {
2789                // Restore the `current_thread` to be the host so `lower` knows
2790                // how to manipulate borrows and knows which scope of borrows
2791                // to check.
2792                let mut store = token.as_context_mut(store);
2793                let state = store.0.concurrent_state_mut();
2794                assert!(state.current_thread.is_none());
2795                store.0.set_thread(task);
2796
2797                let status = if result.is_some() {
2798                    Status::Returned
2799                } else {
2800                    Status::ReturnCancelled
2801                };
2802
2803                lower(store.as_context_mut(), result)?;
2804                let state = store.0.concurrent_state_mut();
2805                state.get_mut(task)?.state = HostTaskState::CalleeDone;
2806                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2807
2808                // Go back to "no current thread" at the end.
2809                store.0.set_thread(CurrentThread::None);
2810                Ok(())
2811            };
2812
2813            // Here we schedule a task to run on a worker fiber to do the
2814            // lowering since it may involve a call to the guest's realloc
2815            // function. This is necessary because calling the guest while
2816            // there are host embedder frames on the stack is unsound.
2817            tls::get(move |store| {
2818                store
2819                    .concurrent_state_mut()
2820                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2821                        on_complete,
2822                    ))));
2823                Ok(())
2824            })
2825        });
2826
2827        // Make this task visible to the guest and then record what it
2828        // was made visible as.
2829        let state = store.0.concurrent_state_mut();
2830        state.push_future(future);
2831        let caller = state.get_mut(task)?.caller;
2832        let instance = state.get_mut(caller.task)?.instance;
2833        let handle = store
2834            .0
2835            .instance_state(instance)
2836            .handle_table()
2837            .subtask_insert_host(task.rep())?;
2838        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2839        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2840
2841        // Restore the currently running thread to this host task's
2842        // caller. Note that the host task isn't deallocated as it's
2843        // within the store and will get deallocated later.
2844        store.0.set_thread(caller);
2845        Ok(Some(handle))
2846    }
2847
2848    /// Implements the `task.return` intrinsic, lifting the result for the
2849    /// current guest task.
2850    pub(crate) fn task_return(
2851        self,
2852        store: &mut dyn VMStore,
2853        ty: TypeTupleIndex,
2854        options: OptionsIndex,
2855        storage: &[ValRaw],
2856    ) -> Result<()> {
2857        let state = store.concurrent_state_mut();
2858        let guest_thread = state.unwrap_current_guest_thread();
2859        let lift = state
2860            .get_mut(guest_thread.task)?
2861            .lift_result
2862            .take()
2863            .ok_or_else(|| {
2864                format_err!("`task.return` or `task.cancel` called more than once for current task")
2865            })?;
2866        assert!(state.get_mut(guest_thread.task)?.result.is_none());
2867
2868        let CanonicalOptions {
2869            string_encoding,
2870            data_model,
2871            ..
2872        } = &self.id().get(store).component().env_component().options[options];
2873
2874        let invalid = ty != lift.ty
2875            || string_encoding != &lift.string_encoding
2876            || match data_model {
2877                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2878                    Some(memory) => {
2879                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2880                        let actual = self.id().get(store).runtime_memory(memory);
2881                        expected != actual.as_ptr()
2882                    }
2883                    // Memory not specified, meaning it didn't need to be
2884                    // specified per validation, so not invalid.
2885                    None => false,
2886                },
2887                // Always invalid as this isn't supported.
2888                CanonicalOptionsDataModel::Gc { .. } => true,
2889            };
2890
2891        if invalid {
2892            bail!("invalid `task.return` signature and/or options for current task");
2893        }
2894
2895        log::trace!("task.return for {guest_thread:?}");
2896
2897        let result = (lift.lift)(store, storage)?;
2898        self.task_complete(store, guest_thread.task, result, Status::Returned)
2899    }
2900
2901    /// Implements the `task.cancel` intrinsic.
2902    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2903        let state = store.concurrent_state_mut();
2904        let guest_thread = state.unwrap_current_guest_thread();
2905        let task = state.get_mut(guest_thread.task)?;
2906        if !task.cancel_sent {
2907            bail!("`task.cancel` called by task which has not been cancelled")
2908        }
2909        _ = task.lift_result.take().ok_or_else(|| {
2910            format_err!("`task.return` or `task.cancel` called more than once for current task")
2911        })?;
2912
2913        assert!(task.result.is_none());
2914
2915        log::trace!("task.cancel for {guest_thread:?}");
2916
2917        self.task_complete(
2918            store,
2919            guest_thread.task,
2920            Box::new(DummyResult),
2921            Status::ReturnCancelled,
2922        )
2923    }
2924
2925    /// Complete the specified guest task (i.e. indicate that it has either
2926    /// returned a (possibly empty) result or cancelled itself).
2927    ///
2928    /// This will return any resource borrows and notify any current or future
2929    /// waiters that the task has completed.
2930    fn task_complete(
2931        self,
2932        store: &mut StoreOpaque,
2933        guest_task: TableId<GuestTask>,
2934        result: Box<dyn Any + Send + Sync>,
2935        status: Status,
2936    ) -> Result<()> {
2937        store
2938            .component_resource_tables(Some(self))
2939            .validate_scope_exit()?;
2940
2941        let state = store.concurrent_state_mut();
2942        let task = state.get_mut(guest_task)?;
2943
2944        if let Caller::Host { tx, .. } = &mut task.caller {
2945            if let Some(tx) = tx.take() {
2946                _ = tx.send(result);
2947            }
2948        } else {
2949            task.result = Some(result);
2950            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2951        }
2952
2953        Ok(())
2954    }
2955
2956    /// Implements the `waitable-set.new` intrinsic.
2957    pub(crate) fn waitable_set_new(
2958        self,
2959        store: &mut StoreOpaque,
2960        caller_instance: RuntimeComponentInstanceIndex,
2961    ) -> Result<u32> {
2962        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2963        let handle = store
2964            .instance_state(RuntimeInstance {
2965                instance: self.id().instance(),
2966                index: caller_instance,
2967            })
2968            .handle_table()
2969            .waitable_set_insert(set.rep())?;
2970        log::trace!("new waitable set {set:?} (handle {handle})");
2971        Ok(handle)
2972    }
2973
2974    /// Implements the `waitable-set.drop` intrinsic.
2975    pub(crate) fn waitable_set_drop(
2976        self,
2977        store: &mut StoreOpaque,
2978        caller_instance: RuntimeComponentInstanceIndex,
2979        set: u32,
2980    ) -> Result<()> {
2981        let rep = store
2982            .instance_state(RuntimeInstance {
2983                instance: self.id().instance(),
2984                index: caller_instance,
2985            })
2986            .handle_table()
2987            .waitable_set_remove(set)?;
2988
2989        log::trace!("drop waitable set {rep} (handle {set})");
2990
2991        let set = store
2992            .concurrent_state_mut()
2993            .delete(TableId::<WaitableSet>::new(rep))?;
2994
2995        if !set.waiting.is_empty() {
2996            bail!("cannot drop waitable set with waiters");
2997        }
2998
2999        Ok(())
3000    }
3001
3002    /// Implements the `waitable.join` intrinsic.
3003    pub(crate) fn waitable_join(
3004        self,
3005        store: &mut StoreOpaque,
3006        caller_instance: RuntimeComponentInstanceIndex,
3007        waitable_handle: u32,
3008        set_handle: u32,
3009    ) -> Result<()> {
3010        let mut instance = self.id().get_mut(store);
3011        let waitable =
3012            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3013
3014        let set = if set_handle == 0 {
3015            None
3016        } else {
3017            let set = instance.instance_states().0[caller_instance]
3018                .handle_table()
3019                .waitable_set_rep(set_handle)?;
3020
3021            Some(TableId::<WaitableSet>::new(set))
3022        };
3023
3024        log::trace!(
3025            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3026        );
3027
3028        waitable.join(store.concurrent_state_mut(), set)
3029    }
3030
3031    /// Implements the `subtask.drop` intrinsic.
3032    pub(crate) fn subtask_drop(
3033        self,
3034        store: &mut StoreOpaque,
3035        caller_instance: RuntimeComponentInstanceIndex,
3036        task_id: u32,
3037    ) -> Result<()> {
3038        self.waitable_join(store, caller_instance, task_id, 0)?;
3039
3040        let (rep, is_host) = store
3041            .instance_state(RuntimeInstance {
3042                instance: self.id().instance(),
3043                index: caller_instance,
3044            })
3045            .handle_table()
3046            .subtask_remove(task_id)?;
3047
3048        let concurrent_state = store.concurrent_state_mut();
3049        let (waitable, expected_caller, delete) = if is_host {
3050            let id = TableId::<HostTask>::new(rep);
3051            let task = concurrent_state.get_mut(id)?;
3052            match &task.state {
3053                HostTaskState::CalleeRunning(_) => {
3054                    bail!("cannot drop a subtask which has not yet resolved");
3055                }
3056                HostTaskState::CalleeDone => {}
3057                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3058            }
3059            (Waitable::Host(id), task.caller, true)
3060        } else {
3061            let id = TableId::<GuestTask>::new(rep);
3062            let task = concurrent_state.get_mut(id)?;
3063            if task.lift_result.is_some() {
3064                bail!("cannot drop a subtask which has not yet resolved");
3065            }
3066            if let Caller::Guest { thread } = task.caller {
3067                (
3068                    Waitable::Guest(id),
3069                    thread,
3070                    concurrent_state.get_mut(id)?.exited,
3071                )
3072            } else {
3073                unreachable!()
3074            }
3075        };
3076
3077        waitable.common(concurrent_state)?.handle = None;
3078
3079        if waitable.take_event(concurrent_state)?.is_some() {
3080            bail!("cannot drop a subtask with an undelivered event");
3081        }
3082
3083        if delete {
3084            waitable.delete_from(concurrent_state)?;
3085        }
3086
3087        // Since waitables can neither be passed between instances nor forged,
3088        // this should never fail unless there's a bug in Wasmtime, but we check
3089        // here to be sure:
3090        assert_eq!(
3091            expected_caller,
3092            concurrent_state.unwrap_current_guest_thread(),
3093        );
3094        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3095        Ok(())
3096    }
3097
3098    /// Implements the `waitable-set.wait` intrinsic.
3099    pub(crate) fn waitable_set_wait(
3100        self,
3101        store: &mut StoreOpaque,
3102        options: OptionsIndex,
3103        set: u32,
3104        payload: u32,
3105    ) -> Result<u32> {
3106        if !self.options(store, options).async_ {
3107            // The caller may only call `waitable-set.wait` from an async task
3108            // (i.e. a task created via a call to an async export).
3109            // Otherwise, we'll trap.
3110            store.check_blocking()?;
3111        }
3112
3113        let &CanonicalOptions {
3114            cancellable,
3115            instance: caller_instance,
3116            ..
3117        } = &self.id().get(store).component().env_component().options[options];
3118        let rep = store
3119            .instance_state(RuntimeInstance {
3120                instance: self.id().instance(),
3121                index: caller_instance,
3122            })
3123            .handle_table()
3124            .waitable_set_rep(set)?;
3125
3126        self.waitable_check(
3127            store,
3128            cancellable,
3129            WaitableCheck::Wait,
3130            WaitableCheckParams {
3131                set: TableId::new(rep),
3132                options,
3133                payload,
3134            },
3135        )
3136    }
3137
3138    /// Implements the `waitable-set.poll` intrinsic.
3139    pub(crate) fn waitable_set_poll(
3140        self,
3141        store: &mut StoreOpaque,
3142        options: OptionsIndex,
3143        set: u32,
3144        payload: u32,
3145    ) -> Result<u32> {
3146        let &CanonicalOptions {
3147            cancellable,
3148            instance: caller_instance,
3149            ..
3150        } = &self.id().get(store).component().env_component().options[options];
3151        let rep = store
3152            .instance_state(RuntimeInstance {
3153                instance: self.id().instance(),
3154                index: caller_instance,
3155            })
3156            .handle_table()
3157            .waitable_set_rep(set)?;
3158
3159        self.waitable_check(
3160            store,
3161            cancellable,
3162            WaitableCheck::Poll,
3163            WaitableCheckParams {
3164                set: TableId::new(rep),
3165                options,
3166                payload,
3167            },
3168        )
3169    }
3170
3171    /// Implements the `thread.index` intrinsic.
3172    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3173        let thread_id = store
3174            .concurrent_state_mut()
3175            .unwrap_current_guest_thread()
3176            .thread;
3177        // The unwrap is safe because `instance_rep` must be `Some` by this point
3178        Ok(store
3179            .concurrent_state_mut()
3180            .get_mut(thread_id)?
3181            .instance_rep
3182            .unwrap())
3183    }
3184
3185    /// Implements the `thread.new-indirect` intrinsic.
3186    pub(crate) fn thread_new_indirect<T: 'static>(
3187        self,
3188        mut store: StoreContextMut<T>,
3189        runtime_instance: RuntimeComponentInstanceIndex,
3190        _func_ty_idx: TypeFuncIndex, // currently unused
3191        start_func_table_idx: RuntimeTableIndex,
3192        start_func_idx: u32,
3193        context: i32,
3194    ) -> Result<u32> {
3195        log::trace!("creating new thread");
3196
3197        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3198        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3199        let callee = instance
3200            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3201            .ok_or_else(|| {
3202                format_err!("the start function index points to an uninitialized function")
3203            })?;
3204        if callee.type_index(store.0) != start_func_ty.type_index() {
3205            bail!(
3206                "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3207            );
3208        }
3209
3210        let token = StoreToken::new(store.as_context_mut());
3211        let start_func = Box::new(
3212            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3213                let old_thread = store.set_thread(guest_thread);
3214                log::trace!(
3215                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3216                );
3217
3218                let mut store = token.as_context_mut(store);
3219                let mut params = [ValRaw::i32(context)];
3220                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3221                // on a separate fiber if we're running in an async store.
3222                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3223
3224                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3225                log::trace!("explicit thread {guest_thread:?} completed");
3226                let state = store.0.concurrent_state_mut();
3227                let task = state.get_mut(guest_thread.task)?;
3228                if task.threads.is_empty() && !task.returned_or_cancelled() {
3229                    bail!(Trap::NoAsyncResult);
3230                }
3231                store.0.set_thread(old_thread);
3232                let state = store.0.concurrent_state_mut();
3233                old_thread
3234                    .guest()
3235                    .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3236                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3237                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3238                }
3239                log::trace!("thread start: restored {old_thread:?} as current thread");
3240
3241                Ok(())
3242            },
3243        );
3244
3245        let state = store.0.concurrent_state_mut();
3246        let current_thread = state.unwrap_current_guest_thread();
3247        let parent_task = current_thread.task;
3248
3249        let new_thread = GuestThread::new_explicit(parent_task, start_func);
3250        let thread_id = state.push(new_thread)?;
3251        state.get_mut(parent_task)?.threads.insert(thread_id);
3252
3253        log::trace!("new thread with id {thread_id:?} created");
3254
3255        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3256    }
3257
3258    pub(crate) fn resume_thread(
3259        self,
3260        store: &mut StoreOpaque,
3261        runtime_instance: RuntimeComponentInstanceIndex,
3262        thread_idx: u32,
3263        high_priority: bool,
3264        allow_ready: bool,
3265    ) -> Result<()> {
3266        let thread_id =
3267            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3268        let state = store.concurrent_state_mut();
3269        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3270        let thread = state.get_mut(guest_thread.thread)?;
3271
3272        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3273            GuestThreadState::NotStartedExplicit(start_func) => {
3274                log::trace!("starting thread {guest_thread:?}");
3275                let guest_call = WorkItem::GuestCall(
3276                    runtime_instance,
3277                    GuestCall {
3278                        thread: guest_thread,
3279                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3280                            start_func(store, guest_thread)
3281                        })),
3282                    },
3283                );
3284                store
3285                    .concurrent_state_mut()
3286                    .push_work_item(guest_call, high_priority);
3287            }
3288            GuestThreadState::Suspended(fiber) => {
3289                log::trace!("resuming thread {thread_id:?} that was suspended");
3290                store
3291                    .concurrent_state_mut()
3292                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3293            }
3294            GuestThreadState::Ready(fiber) if allow_ready => {
3295                log::trace!("resuming thread {thread_id:?} that was ready");
3296                thread.state = GuestThreadState::Ready(fiber);
3297                store
3298                    .concurrent_state_mut()
3299                    .promote_thread_work_item(guest_thread);
3300            }
3301            other => {
3302                thread.state = other;
3303                bail!("cannot resume thread which is not suspended");
3304            }
3305        }
3306        Ok(())
3307    }
3308
3309    fn add_guest_thread_to_instance_table(
3310        self,
3311        thread_id: TableId<GuestThread>,
3312        store: &mut StoreOpaque,
3313        runtime_instance: RuntimeComponentInstanceIndex,
3314    ) -> Result<u32> {
3315        let guest_id = store
3316            .instance_state(RuntimeInstance {
3317                instance: self.id().instance(),
3318                index: runtime_instance,
3319            })
3320            .thread_handle_table()
3321            .guest_thread_insert(thread_id.rep())?;
3322        store
3323            .concurrent_state_mut()
3324            .get_mut(thread_id)?
3325            .instance_rep = Some(guest_id);
3326        Ok(guest_id)
3327    }
3328
3329    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3330    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3331    pub(crate) fn suspension_intrinsic(
3332        self,
3333        store: &mut StoreOpaque,
3334        caller: RuntimeComponentInstanceIndex,
3335        cancellable: bool,
3336        yielding: bool,
3337        to_thread: SuspensionTarget,
3338    ) -> Result<WaitResult> {
3339        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3340        if to_thread.is_none() {
3341            let state = store.concurrent_state_mut();
3342            if yielding {
3343                // This is a `thread.yield` call
3344                if !state.may_block(guest_thread.task) {
3345                    // In a non-blocking context, a `thread.yield` may trigger
3346                    // other threads in the same component instance to run.
3347                    if !state.promote_instance_local_thread_work_item(caller) {
3348                        // No other threads are runnable, so just return
3349                        return Ok(WaitResult::Completed);
3350                    }
3351                }
3352            } else {
3353                // The caller may only call `thread.suspend` from an async task
3354                // (i.e. a task created via a call to an async export).
3355                // Otherwise, we'll trap.
3356                store.check_blocking()?;
3357            }
3358        }
3359
3360        // There could be a pending cancellation from a previous uncancellable wait
3361        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3362            return Ok(WaitResult::Cancelled);
3363        }
3364
3365        match to_thread {
3366            SuspensionTarget::SomeSuspended(thread) => {
3367                self.resume_thread(store, caller, thread, true, false)?
3368            }
3369            SuspensionTarget::Some(thread) => {
3370                self.resume_thread(store, caller, thread, true, true)?
3371            }
3372            SuspensionTarget::None => { /* nothing to do */ }
3373        }
3374
3375        let reason = if yielding {
3376            SuspendReason::Yielding {
3377                thread: guest_thread,
3378                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3379                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3380                // panic if we called it in a non-blocking context.
3381                skip_may_block_check: to_thread.is_some(),
3382            }
3383        } else {
3384            SuspendReason::ExplicitlySuspending {
3385                thread: guest_thread,
3386                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3387                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3388                // panic if we called it in a non-blocking context.
3389                skip_may_block_check: to_thread.is_some(),
3390            }
3391        };
3392
3393        store.suspend(reason)?;
3394
3395        if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3396            Ok(WaitResult::Cancelled)
3397        } else {
3398            Ok(WaitResult::Completed)
3399        }
3400    }
3401
3402    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3403    fn waitable_check(
3404        self,
3405        store: &mut StoreOpaque,
3406        cancellable: bool,
3407        check: WaitableCheck,
3408        params: WaitableCheckParams,
3409    ) -> Result<u32> {
3410        let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3411
3412        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3413
3414        let state = store.concurrent_state_mut();
3415        let task = state.get_mut(guest_thread.task)?;
3416
3417        // If we're waiting, and there are no events immediately available,
3418        // suspend the fiber until that changes.
3419        match &check {
3420            WaitableCheck::Wait => {
3421                let set = params.set;
3422
3423                if (task.event.is_none()
3424                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3425                    && state.get_mut(set)?.ready.is_empty()
3426                {
3427                    if cancellable {
3428                        let old = state
3429                            .get_mut(guest_thread.thread)?
3430                            .wake_on_cancel
3431                            .replace(set);
3432                        assert!(old.is_none());
3433                    }
3434
3435                    store.suspend(SuspendReason::Waiting {
3436                        set,
3437                        thread: guest_thread,
3438                        skip_may_block_check: false,
3439                    })?;
3440                }
3441            }
3442            WaitableCheck::Poll => {}
3443        }
3444
3445        log::trace!(
3446            "waitable check for {guest_thread:?}; set {:?}, part two",
3447            params.set
3448        );
3449
3450        // Deliver any pending events to the guest and return.
3451        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3452
3453        let (ordinal, handle, result) = match &check {
3454            WaitableCheck::Wait => {
3455                let (event, waitable) = event.unwrap();
3456                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3457                let (ordinal, result) = event.parts();
3458                (ordinal, handle, result)
3459            }
3460            WaitableCheck::Poll => {
3461                if let Some((event, waitable)) = event {
3462                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463                    let (ordinal, result) = event.parts();
3464                    (ordinal, handle, result)
3465                } else {
3466                    log::trace!(
3467                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3468                        guest_thread.task,
3469                        params.set
3470                    );
3471                    let (ordinal, result) = Event::None.parts();
3472                    (ordinal, 0, result)
3473                }
3474            }
3475        };
3476        let memory = self.options_memory_mut(store, params.options);
3477        let ptr = func::validate_inbounds_dynamic(
3478            &CanonicalAbiInfo::POINTER_PAIR,
3479            memory,
3480            &ValRaw::u32(params.payload),
3481        )?;
3482        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3483        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3484        Ok(ordinal)
3485    }
3486
3487    /// Implements the `subtask.cancel` intrinsic.
3488    pub(crate) fn subtask_cancel(
3489        self,
3490        store: &mut StoreOpaque,
3491        caller_instance: RuntimeComponentInstanceIndex,
3492        async_: bool,
3493        task_id: u32,
3494    ) -> Result<u32> {
3495        if !async_ {
3496            // The caller may only sync call `subtask.cancel` from an async task
3497            // (i.e. a task created via a call to an async export).  Otherwise,
3498            // we'll trap.
3499            store.check_blocking()?;
3500        }
3501
3502        let (rep, is_host) = store
3503            .instance_state(RuntimeInstance {
3504                instance: self.id().instance(),
3505                index: caller_instance,
3506            })
3507            .handle_table()
3508            .subtask_rep(task_id)?;
3509        let (waitable, expected_caller) = if is_host {
3510            let id = TableId::<HostTask>::new(rep);
3511            (
3512                Waitable::Host(id),
3513                store.concurrent_state_mut().get_mut(id)?.caller,
3514            )
3515        } else {
3516            let id = TableId::<GuestTask>::new(rep);
3517            if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3518                (Waitable::Guest(id), thread)
3519            } else {
3520                unreachable!()
3521            }
3522        };
3523        // Since waitables can neither be passed between instances nor forged,
3524        // this should never fail unless there's a bug in Wasmtime, but we check
3525        // here to be sure:
3526        let concurrent_state = store.concurrent_state_mut();
3527        assert_eq!(
3528            expected_caller,
3529            concurrent_state.unwrap_current_guest_thread(),
3530        );
3531
3532        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3533
3534        let needs_block;
3535        if let Waitable::Host(host_task) = waitable {
3536            let state = &mut concurrent_state.get_mut(host_task)?.state;
3537            match mem::replace(state, HostTaskState::CalleeDone) {
3538                // If the callee is still running, signal an abort is requested.
3539                // Then fall through to determine what to do next.
3540                HostTaskState::CalleeRunning(handle) => handle.abort(),
3541
3542                // Cancellation was already requested, so fail as the task can't
3543                // be cancelled twice.
3544                HostTaskState::CalleeDone => {
3545                    bail!("`subtask.cancel` called after terminal status delivered");
3546                }
3547
3548                // These states should not be possible for a subtask that's
3549                // visible from the guest, so panic here.
3550                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3551            }
3552
3553            // Cancelling host tasks always needs to block on them to await the
3554            // result of the completion set up in `first_poll`. This'll resolve
3555            // the race of `handle.abort()` above to see if it actually
3556            // cancelled something or if the future ended up finishing.
3557            needs_block = true;
3558        } else {
3559            let caller = concurrent_state.unwrap_current_guest_thread();
3560            let guest_task = TableId::<GuestTask>::new(rep);
3561            let task = concurrent_state.get_mut(guest_task)?;
3562            if !task.already_lowered_parameters() {
3563                // The task is in a `starting` state, meaning it hasn't run at
3564                // all yet.  Here we update its fields to indicate that it is
3565                // ready to delete immediately once `subtask.drop` is called.
3566                task.lower_params = None;
3567                task.lift_result = None;
3568                task.exited = true;
3569
3570                let instance = task.instance;
3571
3572                assert_eq!(1, task.threads.len());
3573                let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3574                let concurrent_state = store.concurrent_state_mut();
3575                concurrent_state.delete(thread)?;
3576                assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3577
3578                // Not yet started; cancel and remove from pending
3579                let pending = &mut store.instance_state(instance).concurrent_state().pending;
3580                let pending_count = pending.len();
3581                pending.retain(|thread, _| thread.task != guest_task);
3582                // If there were no pending threads for this task, we're in an error state
3583                if pending.len() == pending_count {
3584                    bail!("`subtask.cancel` called after terminal status delivered");
3585                }
3586                return Ok(Status::StartCancelled as u32);
3587            } else if !task.returned_or_cancelled() {
3588                // Started, but not yet returned or cancelled; send the
3589                // `CANCELLED` event
3590                task.cancel_sent = true;
3591                // Note that this might overwrite an event that was set earlier
3592                // (e.g. `Event::None` if the task is yielding, or
3593                // `Event::Cancelled` if it was already cancelled), but that's
3594                // okay -- this should supersede the previous state.
3595                task.event = Some(Event::Cancelled);
3596                let runtime_instance = task.instance.index;
3597                for thread in task.threads.clone() {
3598                    let thread = QualifiedThreadId {
3599                        task: guest_task,
3600                        thread,
3601                    };
3602                    if let Some(set) = concurrent_state
3603                        .get_mut(thread.thread)
3604                        .unwrap()
3605                        .wake_on_cancel
3606                        .take()
3607                    {
3608                        let item = match concurrent_state
3609                            .get_mut(set)?
3610                            .waiting
3611                            .remove(&thread)
3612                            .unwrap()
3613                        {
3614                            WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3615                            WaitMode::Callback(instance) => WorkItem::GuestCall(
3616                                runtime_instance,
3617                                GuestCall {
3618                                    thread,
3619                                    kind: GuestCallKind::DeliverEvent {
3620                                        instance,
3621                                        set: None,
3622                                    },
3623                                },
3624                            ),
3625                        };
3626                        concurrent_state.push_high_priority(item);
3627
3628                        store.suspend(SuspendReason::Yielding {
3629                            thread: caller,
3630                            // `subtask.cancel` is not allowed to be called in a
3631                            // sync context, so we cannot skip the may-block check.
3632                            skip_may_block_check: false,
3633                        })?;
3634                        break;
3635                    }
3636                }
3637
3638                // Guest tasks need to block if they have not yet returned or
3639                // cancelled, even as a result of the event delivery above.
3640                needs_block = !store
3641                    .concurrent_state_mut()
3642                    .get_mut(guest_task)?
3643                    .returned_or_cancelled()
3644            } else {
3645                needs_block = false;
3646            }
3647        };
3648
3649        // If we need to block waiting on the terminal status of this subtask
3650        // then return immediately in `async` mode, or otherwise wait for the
3651        // event to get signaled through the store.
3652        if needs_block {
3653            if async_ {
3654                return Ok(BLOCKED);
3655            }
3656
3657            // Wait for this waitable to get signaled with its terminal status
3658            // from the completion callback enqueued by `first_poll`. Once
3659            // that's done fall through to the sahred
3660            store.wait_for_event(waitable)?;
3661
3662            // .. fall through to determine what event's in store for us.
3663        }
3664
3665        let event = waitable.take_event(store.concurrent_state_mut())?;
3666        if let Some(Event::Subtask {
3667            status: status @ (Status::Returned | Status::ReturnCancelled),
3668        }) = event
3669        {
3670            Ok(status as u32)
3671        } else {
3672            bail!("`subtask.cancel` called after terminal status delivered");
3673        }
3674    }
3675
3676    pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3677        store.concurrent_state_mut().context_get(slot)
3678    }
3679
3680    pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3681        store.concurrent_state_mut().context_set(slot, value)
3682    }
3683}
3684
3685/// Trait representing component model ABI async intrinsics and fused adapter
3686/// helper functions.
3687///
3688/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3689/// which must be valid for at least the duration of the call (and possibly for
3690/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3691/// pointers used for async calls).
3692pub trait VMComponentAsyncStore {
3693    /// A helper function for fused adapter modules involving calls where the
3694    /// one of the caller or callee is async.
3695    ///
3696    /// This helper is not used when the caller and callee both use the sync
3697    /// ABI, only when at least one is async is this used.
3698    unsafe fn prepare_call(
3699        &mut self,
3700        instance: Instance,
3701        memory: *mut VMMemoryDefinition,
3702        start: *mut VMFuncRef,
3703        return_: *mut VMFuncRef,
3704        caller_instance: RuntimeComponentInstanceIndex,
3705        callee_instance: RuntimeComponentInstanceIndex,
3706        task_return_type: TypeTupleIndex,
3707        callee_async: bool,
3708        string_encoding: u8,
3709        result_count: u32,
3710        storage: *mut ValRaw,
3711        storage_len: usize,
3712    ) -> Result<()>;
3713
3714    /// A helper function for fused adapter modules involving calls where the
3715    /// caller is sync-lowered but the callee is async-lifted.
3716    unsafe fn sync_start(
3717        &mut self,
3718        instance: Instance,
3719        callback: *mut VMFuncRef,
3720        callee: *mut VMFuncRef,
3721        param_count: u32,
3722        storage: *mut MaybeUninit<ValRaw>,
3723        storage_len: usize,
3724    ) -> Result<()>;
3725
3726    /// A helper function for fused adapter modules involving calls where the
3727    /// caller is async-lowered.
3728    unsafe fn async_start(
3729        &mut self,
3730        instance: Instance,
3731        callback: *mut VMFuncRef,
3732        post_return: *mut VMFuncRef,
3733        callee: *mut VMFuncRef,
3734        param_count: u32,
3735        result_count: u32,
3736        flags: u32,
3737    ) -> Result<u32>;
3738
3739    /// The `future.write` intrinsic.
3740    fn future_write(
3741        &mut self,
3742        instance: Instance,
3743        caller: RuntimeComponentInstanceIndex,
3744        ty: TypeFutureTableIndex,
3745        options: OptionsIndex,
3746        future: u32,
3747        address: u32,
3748    ) -> Result<u32>;
3749
3750    /// The `future.read` intrinsic.
3751    fn future_read(
3752        &mut self,
3753        instance: Instance,
3754        caller: RuntimeComponentInstanceIndex,
3755        ty: TypeFutureTableIndex,
3756        options: OptionsIndex,
3757        future: u32,
3758        address: u32,
3759    ) -> Result<u32>;
3760
3761    /// The `future.drop-writable` intrinsic.
3762    fn future_drop_writable(
3763        &mut self,
3764        instance: Instance,
3765        ty: TypeFutureTableIndex,
3766        writer: u32,
3767    ) -> Result<()>;
3768
3769    /// The `stream.write` intrinsic.
3770    fn stream_write(
3771        &mut self,
3772        instance: Instance,
3773        caller: RuntimeComponentInstanceIndex,
3774        ty: TypeStreamTableIndex,
3775        options: OptionsIndex,
3776        stream: u32,
3777        address: u32,
3778        count: u32,
3779    ) -> Result<u32>;
3780
3781    /// The `stream.read` intrinsic.
3782    fn stream_read(
3783        &mut self,
3784        instance: Instance,
3785        caller: RuntimeComponentInstanceIndex,
3786        ty: TypeStreamTableIndex,
3787        options: OptionsIndex,
3788        stream: u32,
3789        address: u32,
3790        count: u32,
3791    ) -> Result<u32>;
3792
3793    /// The "fast-path" implementation of the `stream.write` intrinsic for
3794    /// "flat" (i.e. memcpy-able) payloads.
3795    fn flat_stream_write(
3796        &mut self,
3797        instance: Instance,
3798        caller: RuntimeComponentInstanceIndex,
3799        ty: TypeStreamTableIndex,
3800        options: OptionsIndex,
3801        payload_size: u32,
3802        payload_align: u32,
3803        stream: u32,
3804        address: u32,
3805        count: u32,
3806    ) -> Result<u32>;
3807
3808    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3809    /// (i.e. memcpy-able) payloads.
3810    fn flat_stream_read(
3811        &mut self,
3812        instance: Instance,
3813        caller: RuntimeComponentInstanceIndex,
3814        ty: TypeStreamTableIndex,
3815        options: OptionsIndex,
3816        payload_size: u32,
3817        payload_align: u32,
3818        stream: u32,
3819        address: u32,
3820        count: u32,
3821    ) -> Result<u32>;
3822
3823    /// The `stream.drop-writable` intrinsic.
3824    fn stream_drop_writable(
3825        &mut self,
3826        instance: Instance,
3827        ty: TypeStreamTableIndex,
3828        writer: u32,
3829    ) -> Result<()>;
3830
3831    /// The `error-context.debug-message` intrinsic.
3832    fn error_context_debug_message(
3833        &mut self,
3834        instance: Instance,
3835        ty: TypeComponentLocalErrorContextTableIndex,
3836        options: OptionsIndex,
3837        err_ctx_handle: u32,
3838        debug_msg_address: u32,
3839    ) -> Result<()>;
3840
3841    /// The `thread.new-indirect` intrinsic
3842    fn thread_new_indirect(
3843        &mut self,
3844        instance: Instance,
3845        caller: RuntimeComponentInstanceIndex,
3846        func_ty_idx: TypeFuncIndex,
3847        start_func_table_idx: RuntimeTableIndex,
3848        start_func_idx: u32,
3849        context: i32,
3850    ) -> Result<u32>;
3851}
3852
3853/// SAFETY: See trait docs.
3854impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3855    unsafe fn prepare_call(
3856        &mut self,
3857        instance: Instance,
3858        memory: *mut VMMemoryDefinition,
3859        start: *mut VMFuncRef,
3860        return_: *mut VMFuncRef,
3861        caller_instance: RuntimeComponentInstanceIndex,
3862        callee_instance: RuntimeComponentInstanceIndex,
3863        task_return_type: TypeTupleIndex,
3864        callee_async: bool,
3865        string_encoding: u8,
3866        result_count_or_max_if_async: u32,
3867        storage: *mut ValRaw,
3868        storage_len: usize,
3869    ) -> Result<()> {
3870        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3871        // this method will have ensured that `storage` is a valid
3872        // pointer containing at least `storage_len` items.
3873        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3874
3875        unsafe {
3876            instance.prepare_call(
3877                StoreContextMut(self),
3878                start,
3879                return_,
3880                caller_instance,
3881                callee_instance,
3882                task_return_type,
3883                callee_async,
3884                memory,
3885                string_encoding,
3886                match result_count_or_max_if_async {
3887                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3888                        params,
3889                        has_result: false,
3890                    },
3891                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3892                        params,
3893                        has_result: true,
3894                    },
3895                    result_count => CallerInfo::Sync {
3896                        params,
3897                        result_count,
3898                    },
3899                },
3900            )
3901        }
3902    }
3903
3904    unsafe fn sync_start(
3905        &mut self,
3906        instance: Instance,
3907        callback: *mut VMFuncRef,
3908        callee: *mut VMFuncRef,
3909        param_count: u32,
3910        storage: *mut MaybeUninit<ValRaw>,
3911        storage_len: usize,
3912    ) -> Result<()> {
3913        unsafe {
3914            instance
3915                .start_call(
3916                    StoreContextMut(self),
3917                    callback,
3918                    ptr::null_mut(),
3919                    callee,
3920                    param_count,
3921                    1,
3922                    START_FLAG_ASYNC_CALLEE,
3923                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3924                    // this method will have ensured that `storage` is a valid
3925                    // pointer containing at least `storage_len` items.
3926                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3927                )
3928                .map(drop)
3929        }
3930    }
3931
3932    unsafe fn async_start(
3933        &mut self,
3934        instance: Instance,
3935        callback: *mut VMFuncRef,
3936        post_return: *mut VMFuncRef,
3937        callee: *mut VMFuncRef,
3938        param_count: u32,
3939        result_count: u32,
3940        flags: u32,
3941    ) -> Result<u32> {
3942        unsafe {
3943            instance.start_call(
3944                StoreContextMut(self),
3945                callback,
3946                post_return,
3947                callee,
3948                param_count,
3949                result_count,
3950                flags,
3951                None,
3952            )
3953        }
3954    }
3955
3956    fn future_write(
3957        &mut self,
3958        instance: Instance,
3959        caller: RuntimeComponentInstanceIndex,
3960        ty: TypeFutureTableIndex,
3961        options: OptionsIndex,
3962        future: u32,
3963        address: u32,
3964    ) -> Result<u32> {
3965        instance
3966            .guest_write(
3967                StoreContextMut(self),
3968                caller,
3969                TransmitIndex::Future(ty),
3970                options,
3971                None,
3972                future,
3973                address,
3974                1,
3975            )
3976            .map(|result| result.encode())
3977    }
3978
3979    fn future_read(
3980        &mut self,
3981        instance: Instance,
3982        caller: RuntimeComponentInstanceIndex,
3983        ty: TypeFutureTableIndex,
3984        options: OptionsIndex,
3985        future: u32,
3986        address: u32,
3987    ) -> Result<u32> {
3988        instance
3989            .guest_read(
3990                StoreContextMut(self),
3991                caller,
3992                TransmitIndex::Future(ty),
3993                options,
3994                None,
3995                future,
3996                address,
3997                1,
3998            )
3999            .map(|result| result.encode())
4000    }
4001
4002    fn stream_write(
4003        &mut self,
4004        instance: Instance,
4005        caller: RuntimeComponentInstanceIndex,
4006        ty: TypeStreamTableIndex,
4007        options: OptionsIndex,
4008        stream: u32,
4009        address: u32,
4010        count: u32,
4011    ) -> Result<u32> {
4012        instance
4013            .guest_write(
4014                StoreContextMut(self),
4015                caller,
4016                TransmitIndex::Stream(ty),
4017                options,
4018                None,
4019                stream,
4020                address,
4021                count,
4022            )
4023            .map(|result| result.encode())
4024    }
4025
4026    fn stream_read(
4027        &mut self,
4028        instance: Instance,
4029        caller: RuntimeComponentInstanceIndex,
4030        ty: TypeStreamTableIndex,
4031        options: OptionsIndex,
4032        stream: u32,
4033        address: u32,
4034        count: u32,
4035    ) -> Result<u32> {
4036        instance
4037            .guest_read(
4038                StoreContextMut(self),
4039                caller,
4040                TransmitIndex::Stream(ty),
4041                options,
4042                None,
4043                stream,
4044                address,
4045                count,
4046            )
4047            .map(|result| result.encode())
4048    }
4049
4050    fn future_drop_writable(
4051        &mut self,
4052        instance: Instance,
4053        ty: TypeFutureTableIndex,
4054        writer: u32,
4055    ) -> Result<()> {
4056        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4057    }
4058
4059    fn flat_stream_write(
4060        &mut self,
4061        instance: Instance,
4062        caller: RuntimeComponentInstanceIndex,
4063        ty: TypeStreamTableIndex,
4064        options: OptionsIndex,
4065        payload_size: u32,
4066        payload_align: u32,
4067        stream: u32,
4068        address: u32,
4069        count: u32,
4070    ) -> Result<u32> {
4071        instance
4072            .guest_write(
4073                StoreContextMut(self),
4074                caller,
4075                TransmitIndex::Stream(ty),
4076                options,
4077                Some(FlatAbi {
4078                    size: payload_size,
4079                    align: payload_align,
4080                }),
4081                stream,
4082                address,
4083                count,
4084            )
4085            .map(|result| result.encode())
4086    }
4087
4088    fn flat_stream_read(
4089        &mut self,
4090        instance: Instance,
4091        caller: RuntimeComponentInstanceIndex,
4092        ty: TypeStreamTableIndex,
4093        options: OptionsIndex,
4094        payload_size: u32,
4095        payload_align: u32,
4096        stream: u32,
4097        address: u32,
4098        count: u32,
4099    ) -> Result<u32> {
4100        instance
4101            .guest_read(
4102                StoreContextMut(self),
4103                caller,
4104                TransmitIndex::Stream(ty),
4105                options,
4106                Some(FlatAbi {
4107                    size: payload_size,
4108                    align: payload_align,
4109                }),
4110                stream,
4111                address,
4112                count,
4113            )
4114            .map(|result| result.encode())
4115    }
4116
4117    fn stream_drop_writable(
4118        &mut self,
4119        instance: Instance,
4120        ty: TypeStreamTableIndex,
4121        writer: u32,
4122    ) -> Result<()> {
4123        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4124    }
4125
4126    fn error_context_debug_message(
4127        &mut self,
4128        instance: Instance,
4129        ty: TypeComponentLocalErrorContextTableIndex,
4130        options: OptionsIndex,
4131        err_ctx_handle: u32,
4132        debug_msg_address: u32,
4133    ) -> Result<()> {
4134        instance.error_context_debug_message(
4135            StoreContextMut(self),
4136            ty,
4137            options,
4138            err_ctx_handle,
4139            debug_msg_address,
4140        )
4141    }
4142
4143    fn thread_new_indirect(
4144        &mut self,
4145        instance: Instance,
4146        caller: RuntimeComponentInstanceIndex,
4147        func_ty_idx: TypeFuncIndex,
4148        start_func_table_idx: RuntimeTableIndex,
4149        start_func_idx: u32,
4150        context: i32,
4151    ) -> Result<u32> {
4152        instance.thread_new_indirect(
4153            StoreContextMut(self),
4154            caller,
4155            func_ty_idx,
4156            start_func_table_idx,
4157            start_func_idx,
4158            context,
4159        )
4160    }
4161}
4162
4163type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4164
4165/// Represents the state of a pending host task.
4166///
4167/// This is used to represent tasks when the guest calls into the host.
4168struct HostTask {
4169    common: WaitableCommon,
4170
4171    /// Guest thread which called the host.
4172    caller: QualifiedThreadId,
4173
4174    /// State of borrows/etc the host needs to track. Used when the guest passes
4175    /// borrows to the host, for example.
4176    call_context: CallContext,
4177
4178    state: HostTaskState,
4179}
4180
4181enum HostTaskState {
4182    /// A host task has been created and it's considered "started".
4183    ///
4184    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4185    /// is where this will get updated further.
4186    CalleeStarted,
4187
4188    /// State used for tasks in `first_poll` meaning that the guest did an async
4189    /// lower of a host async function which is blocked. The specified handle is
4190    /// linked to the future in the main `FuturesUnordered` of a store which is
4191    /// used to cancel it if the guest requests cancellation.
4192    CalleeRunning(JoinHandle),
4193
4194    /// Terminal state used for tasks in `poll_and_block` to store the result of
4195    /// their computation. Note that this state is not used for tasks in
4196    /// `first_poll`.
4197    CalleeFinished(LiftedResult),
4198
4199    /// Terminal state for host tasks meaning that the task was cancelled or the
4200    /// result was taken.
4201    CalleeDone,
4202}
4203
4204impl HostTask {
4205    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4206        Self {
4207            common: WaitableCommon::default(),
4208            call_context: CallContext::default(),
4209            caller,
4210            state,
4211        }
4212    }
4213}
4214
4215impl TableDebug for HostTask {
4216    fn type_name() -> &'static str {
4217        "HostTask"
4218    }
4219}
4220
4221type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4222
4223/// Represents the caller of a given guest task.
4224enum Caller {
4225    /// The host called the guest task.
4226    Host {
4227        /// If present, may be used to deliver the result.
4228        tx: Option<oneshot::Sender<LiftedResult>>,
4229        /// If true, there's a host future that must be dropped before the task
4230        /// can be deleted.
4231        host_future_present: bool,
4232        /// Represents the caller of the host function which called back into a
4233        /// guest. Note that this thread could belong to an entirely unrelated
4234        /// top-level component instance than the one the host called into.
4235        caller: CurrentThread,
4236    },
4237    /// Another guest thread called the guest task
4238    Guest {
4239        /// The id of the caller
4240        thread: QualifiedThreadId,
4241    },
4242}
4243
4244/// Represents a closure and related canonical ABI parameters required to
4245/// validate a `task.return` call at runtime and lift the result.
4246struct LiftResult {
4247    lift: RawLift,
4248    ty: TypeTupleIndex,
4249    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4250    string_encoding: StringEncoding,
4251}
4252
4253/// The table ID for a guest thread, qualified by the task to which it belongs.
4254///
4255/// This exists to minimize table lookups and the necessity to pass stores around mutably
4256/// for the common case of identifying the task to which a thread belongs.
4257#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4258struct QualifiedThreadId {
4259    task: TableId<GuestTask>,
4260    thread: TableId<GuestThread>,
4261}
4262
4263impl QualifiedThreadId {
4264    fn qualify(
4265        state: &mut ConcurrentState,
4266        thread: TableId<GuestThread>,
4267    ) -> Result<QualifiedThreadId> {
4268        Ok(QualifiedThreadId {
4269            task: state.get_mut(thread)?.parent_task,
4270            thread,
4271        })
4272    }
4273}
4274
4275impl fmt::Debug for QualifiedThreadId {
4276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4277        f.debug_tuple("QualifiedThreadId")
4278            .field(&self.task.rep())
4279            .field(&self.thread.rep())
4280            .finish()
4281    }
4282}
4283
4284enum GuestThreadState {
4285    NotStartedImplicit,
4286    NotStartedExplicit(
4287        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4288    ),
4289    Running,
4290    Suspended(StoreFiber<'static>),
4291    Ready(StoreFiber<'static>),
4292    Completed,
4293}
4294pub struct GuestThread {
4295    /// Context-local state used to implement the `context.{get,set}`
4296    /// intrinsics.
4297    context: [u32; 2],
4298    /// The owning guest task.
4299    parent_task: TableId<GuestTask>,
4300    /// If present, indicates that the thread is currently waiting on the
4301    /// specified set but may be cancelled and woken immediately.
4302    wake_on_cancel: Option<TableId<WaitableSet>>,
4303    /// The execution state of this guest thread
4304    state: GuestThreadState,
4305    /// The index of this thread in the component instance's handle table.
4306    /// This must always be `Some` after initialization.
4307    instance_rep: Option<u32>,
4308}
4309
4310impl GuestThread {
4311    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4312    /// handle.
4313    fn from_instance(
4314        state: Pin<&mut ComponentInstance>,
4315        caller_instance: RuntimeComponentInstanceIndex,
4316        guest_thread: u32,
4317    ) -> Result<TableId<Self>> {
4318        let rep = state.instance_states().0[caller_instance]
4319            .thread_handle_table()
4320            .guest_thread_rep(guest_thread)?;
4321        Ok(TableId::new(rep))
4322    }
4323
4324    fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4325        Self {
4326            context: [0; 2],
4327            parent_task,
4328            wake_on_cancel: None,
4329            state: GuestThreadState::NotStartedImplicit,
4330            instance_rep: None,
4331        }
4332    }
4333
4334    fn new_explicit(
4335        parent_task: TableId<GuestTask>,
4336        start_func: Box<
4337            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4338        >,
4339    ) -> Self {
4340        Self {
4341            context: [0; 2],
4342            parent_task,
4343            wake_on_cancel: None,
4344            state: GuestThreadState::NotStartedExplicit(start_func),
4345            instance_rep: None,
4346        }
4347    }
4348}
4349
4350impl TableDebug for GuestThread {
4351    fn type_name() -> &'static str {
4352        "GuestThread"
4353    }
4354}
4355
4356enum SyncResult {
4357    NotProduced,
4358    Produced(Option<ValRaw>),
4359    Taken,
4360}
4361
4362impl SyncResult {
4363    fn take(&mut self) -> Option<Option<ValRaw>> {
4364        match mem::replace(self, SyncResult::Taken) {
4365            SyncResult::NotProduced => None,
4366            SyncResult::Produced(val) => Some(val),
4367            SyncResult::Taken => {
4368                panic!("attempted to take a synchronous result that was already taken")
4369            }
4370        }
4371    }
4372}
4373
4374#[derive(Debug)]
4375enum HostFutureState {
4376    NotApplicable,
4377    Live,
4378    Dropped,
4379}
4380
4381/// Represents a pending guest task.
4382pub(crate) struct GuestTask {
4383    /// See `WaitableCommon`
4384    common: WaitableCommon,
4385    /// Closure to lower the parameters passed to this task.
4386    lower_params: Option<RawLower>,
4387    /// See `LiftResult`
4388    lift_result: Option<LiftResult>,
4389    /// A place to stash the type-erased lifted result if it can't be delivered
4390    /// immediately.
4391    result: Option<LiftedResult>,
4392    /// Closure to call the callback function for an async-lifted export, if
4393    /// provided.
4394    callback: Option<CallbackFn>,
4395    /// See `Caller`
4396    caller: Caller,
4397    /// Borrow state for this task.
4398    ///
4399    /// Keeps track of `borrow<T>` received to this task to ensure that
4400    /// everything is dropped by the time it exits.
4401    call_context: CallContext,
4402    /// A place to stash the lowered result for a sync-to-async call until it
4403    /// can be returned to the caller.
4404    sync_result: SyncResult,
4405    /// Whether or not the task has been cancelled (i.e. whether the task is
4406    /// permitted to call `task.cancel`).
4407    cancel_sent: bool,
4408    /// Whether or not we've sent a `Status::Starting` event to any current or
4409    /// future waiters for this waitable.
4410    starting_sent: bool,
4411    /// Scratch waitable set used to watch subtasks during synchronous calls.
4412    sync_call_set: TableId<WaitableSet>,
4413    /// The runtime instance to which the exported function for this guest task
4414    /// belongs.
4415    ///
4416    /// Note that the task may do a sync->sync call via a fused adapter which
4417    /// results in that task executing code in a different instance, and it may
4418    /// call host functions and intrinsics from that other instance.
4419    instance: RuntimeInstance,
4420    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4421    /// delivered to this task.
4422    event: Option<Event>,
4423    /// Whether or not the task has exited.
4424    exited: bool,
4425    /// Threads belonging to this task
4426    threads: HashSet<TableId<GuestThread>>,
4427    /// The state of the host future that represents an async task, which must
4428    /// be dropped before we can delete the task.
4429    host_future_state: HostFutureState,
4430    /// Indicates whether this task was created for a call to an async-lifted
4431    /// export.
4432    async_function: bool,
4433}
4434
4435impl GuestTask {
4436    fn already_lowered_parameters(&self) -> bool {
4437        // We reset `lower_params` after we lower the parameters
4438        self.lower_params.is_none()
4439    }
4440
4441    fn returned_or_cancelled(&self) -> bool {
4442        // We reset `lift_result` after we return or exit
4443        self.lift_result.is_none()
4444    }
4445
4446    fn ready_to_delete(&self) -> bool {
4447        let threads_completed = self.threads.is_empty();
4448        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4449        let pending_completion_event = matches!(
4450            self.common.event,
4451            Some(Event::Subtask {
4452                status: Status::Returned | Status::ReturnCancelled
4453            })
4454        );
4455        let ready = threads_completed
4456            && !has_sync_result
4457            && !pending_completion_event
4458            && !matches!(self.host_future_state, HostFutureState::Live);
4459        log::trace!(
4460            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4461            threads_completed,
4462            has_sync_result,
4463            pending_completion_event,
4464            self.host_future_state
4465        );
4466        ready
4467    }
4468
4469    fn new(
4470        state: &mut ConcurrentState,
4471        lower_params: RawLower,
4472        lift_result: LiftResult,
4473        caller: Caller,
4474        callback: Option<CallbackFn>,
4475        instance: RuntimeInstance,
4476        async_function: bool,
4477    ) -> Result<Self> {
4478        let sync_call_set = state.push(WaitableSet::default())?;
4479        let host_future_state = match &caller {
4480            Caller::Guest { .. } => HostFutureState::NotApplicable,
4481            Caller::Host {
4482                host_future_present,
4483                ..
4484            } => {
4485                if *host_future_present {
4486                    HostFutureState::Live
4487                } else {
4488                    HostFutureState::NotApplicable
4489                }
4490            }
4491        };
4492        Ok(Self {
4493            common: WaitableCommon::default(),
4494            lower_params: Some(lower_params),
4495            lift_result: Some(lift_result),
4496            result: None,
4497            callback,
4498            caller,
4499            call_context: CallContext::default(),
4500            sync_result: SyncResult::NotProduced,
4501            cancel_sent: false,
4502            starting_sent: false,
4503            sync_call_set,
4504            instance,
4505            event: None,
4506            exited: false,
4507            threads: HashSet::new(),
4508            host_future_state,
4509            async_function,
4510        })
4511    }
4512
4513    /// Dispose of this guest task, reparenting any pending subtasks to the
4514    /// caller.
4515    fn dispose(self, state: &mut ConcurrentState) -> Result<()> {
4516        // If there are not-yet-delivered completion events for subtasks in
4517        // `self.sync_call_set`, recursively dispose of those subtasks as well.
4518        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4519            if let Some(Event::Subtask {
4520                status: Status::Returned | Status::ReturnCancelled,
4521            }) = waitable.common(state)?.event
4522            {
4523                waitable.delete_from(state)?;
4524            }
4525        }
4526
4527        assert!(self.threads.is_empty());
4528
4529        state.delete(self.sync_call_set)?;
4530
4531        Ok(())
4532    }
4533}
4534
4535impl TableDebug for GuestTask {
4536    fn type_name() -> &'static str {
4537        "GuestTask"
4538    }
4539}
4540
4541/// Represents state common to all kinds of waitables.
4542#[derive(Default)]
4543struct WaitableCommon {
4544    /// The currently pending event for this waitable, if any.
4545    event: Option<Event>,
4546    /// The set to which this waitable belongs, if any.
4547    set: Option<TableId<WaitableSet>>,
4548    /// The handle with which the guest refers to this waitable, if any.
4549    handle: Option<u32>,
4550}
4551
4552/// Represents a Component Model Async `waitable`.
4553#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4554enum Waitable {
4555    /// A host task
4556    Host(TableId<HostTask>),
4557    /// A guest task
4558    Guest(TableId<GuestTask>),
4559    /// The read or write end of a stream or future
4560    Transmit(TableId<TransmitHandle>),
4561}
4562
4563impl Waitable {
4564    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4565    /// handle.
4566    fn from_instance(
4567        state: Pin<&mut ComponentInstance>,
4568        caller_instance: RuntimeComponentInstanceIndex,
4569        waitable: u32,
4570    ) -> Result<Self> {
4571        use crate::runtime::vm::component::Waitable;
4572
4573        let (waitable, kind) = state.instance_states().0[caller_instance]
4574            .handle_table()
4575            .waitable_rep(waitable)?;
4576
4577        Ok(match kind {
4578            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4579            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4580            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4581        })
4582    }
4583
4584    /// Retrieve the host-visible identifier for this `Waitable`.
4585    fn rep(&self) -> u32 {
4586        match self {
4587            Self::Host(id) => id.rep(),
4588            Self::Guest(id) => id.rep(),
4589            Self::Transmit(id) => id.rep(),
4590        }
4591    }
4592
4593    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4594    /// remove it from any set it may currently belong to (when `set` is
4595    /// `None`).
4596    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4597        log::trace!("waitable {self:?} join set {set:?}",);
4598
4599        let old = mem::replace(&mut self.common(state)?.set, set);
4600
4601        if let Some(old) = old {
4602            match *self {
4603                Waitable::Host(id) => state.remove_child(id, old),
4604                Waitable::Guest(id) => state.remove_child(id, old),
4605                Waitable::Transmit(id) => state.remove_child(id, old),
4606            }?;
4607
4608            state.get_mut(old)?.ready.remove(self);
4609        }
4610
4611        if let Some(set) = set {
4612            match *self {
4613                Waitable::Host(id) => state.add_child(id, set),
4614                Waitable::Guest(id) => state.add_child(id, set),
4615                Waitable::Transmit(id) => state.add_child(id, set),
4616            }?;
4617
4618            if self.common(state)?.event.is_some() {
4619                self.mark_ready(state)?;
4620            }
4621        }
4622
4623        Ok(())
4624    }
4625
4626    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4627    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4628        Ok(match self {
4629            Self::Host(id) => &mut state.get_mut(*id)?.common,
4630            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4631            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4632        })
4633    }
4634
4635    /// Set or clear the pending event for this waitable and either deliver it
4636    /// to the first waiter, if any, or mark it as ready to be delivered to the
4637    /// next waiter that arrives.
4638    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4639        log::trace!("set event for {self:?}: {event:?}");
4640        self.common(state)?.event = event;
4641        self.mark_ready(state)
4642    }
4643
4644    /// Take the pending event from this waitable, leaving `None` in its place.
4645    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4646        let common = self.common(state)?;
4647        let event = common.event.take();
4648        if let Some(set) = self.common(state)?.set {
4649            state.get_mut(set)?.ready.remove(self);
4650        }
4651
4652        Ok(event)
4653    }
4654
4655    /// Deliver the current event for this waitable to the first waiter, if any,
4656    /// or else mark it as ready to be delivered to the next waiter that
4657    /// arrives.
4658    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4659        if let Some(set) = self.common(state)?.set {
4660            state.get_mut(set)?.ready.insert(*self);
4661            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4662                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4663                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4664
4665                let item = match mode {
4666                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4667                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4668                        state.get_mut(thread.task)?.instance.index,
4669                        GuestCall {
4670                            thread,
4671                            kind: GuestCallKind::DeliverEvent {
4672                                instance,
4673                                set: Some(set),
4674                            },
4675                        },
4676                    ),
4677                };
4678                state.push_high_priority(item);
4679            }
4680        }
4681        Ok(())
4682    }
4683
4684    /// Remove this waitable from the instance's rep table.
4685    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4686        match self {
4687            Self::Host(task) => {
4688                log::trace!("delete host task {task:?}");
4689                state.delete(*task)?;
4690            }
4691            Self::Guest(task) => {
4692                log::trace!("delete guest task {task:?}");
4693                state.delete(*task)?.dispose(state)?;
4694            }
4695            Self::Transmit(task) => {
4696                state.delete(*task)?;
4697            }
4698        }
4699
4700        Ok(())
4701    }
4702}
4703
4704impl fmt::Debug for Waitable {
4705    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4706        match self {
4707            Self::Host(id) => write!(f, "{id:?}"),
4708            Self::Guest(id) => write!(f, "{id:?}"),
4709            Self::Transmit(id) => write!(f, "{id:?}"),
4710        }
4711    }
4712}
4713
4714/// Represents a Component Model Async `waitable-set`.
4715#[derive(Default)]
4716struct WaitableSet {
4717    /// Which waitables in this set have pending events, if any.
4718    ready: BTreeSet<Waitable>,
4719    /// Which guest threads are currently waiting on this set, if any.
4720    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4721}
4722
4723impl TableDebug for WaitableSet {
4724    fn type_name() -> &'static str {
4725        "WaitableSet"
4726    }
4727}
4728
4729/// Type-erased closure to lower the parameters for a guest task.
4730type RawLower =
4731    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4732
4733/// Type-erased closure to lift the result for a guest task.
4734type RawLift = Box<
4735    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4736>;
4737
4738/// Type erased result of a guest task which may be downcast to the expected
4739/// type by a host caller (or simply ignored in the case of a guest caller; see
4740/// `DummyResult`).
4741type LiftedResult = Box<dyn Any + Send + Sync>;
4742
4743/// Used to return a result from a `LiftFn` when the actual result has already
4744/// been lowered to a guest task's stack and linear memory.
4745struct DummyResult;
4746
4747/// Represents the Component Model Async state of a (sub-)component instance.
4748#[derive(Default)]
4749pub struct ConcurrentInstanceState {
4750    /// Whether backpressure is set for this instance (enabled if >0)
4751    backpressure: u16,
4752    /// Whether this instance can be entered
4753    do_not_enter: bool,
4754    /// Pending calls for this instance which require `Self::backpressure` to be
4755    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4756    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4757}
4758
4759impl ConcurrentInstanceState {
4760    pub fn pending_is_empty(&self) -> bool {
4761        self.pending.is_empty()
4762    }
4763}
4764
4765#[derive(Debug, Copy, Clone)]
4766enum CurrentThread {
4767    Guest(QualifiedThreadId),
4768    Host(TableId<HostTask>),
4769    None,
4770}
4771
4772impl CurrentThread {
4773    fn guest(&self) -> Option<&QualifiedThreadId> {
4774        match self {
4775            Self::Guest(id) => Some(id),
4776            _ => None,
4777        }
4778    }
4779
4780    fn host(&self) -> Option<TableId<HostTask>> {
4781        match self {
4782            Self::Host(id) => Some(*id),
4783            _ => None,
4784        }
4785    }
4786
4787    fn is_none(&self) -> bool {
4788        matches!(self, Self::None)
4789    }
4790}
4791
4792impl From<QualifiedThreadId> for CurrentThread {
4793    fn from(id: QualifiedThreadId) -> Self {
4794        Self::Guest(id)
4795    }
4796}
4797
4798impl From<TableId<HostTask>> for CurrentThread {
4799    fn from(id: TableId<HostTask>) -> Self {
4800        Self::Host(id)
4801    }
4802}
4803
4804/// Represents the Component Model Async state of a store.
4805pub struct ConcurrentState {
4806    /// The currently running thread, if any.
4807    current_thread: CurrentThread,
4808
4809    /// The set of pending host and background tasks, if any.
4810    ///
4811    /// See `ComponentInstance::poll_until` for where we temporarily take this
4812    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4813    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4814    /// The table of waitables, waitable sets, etc.
4815    table: AlwaysMut<ResourceTable>,
4816    /// The "high priority" work queue for this store's event loop.
4817    high_priority: Vec<WorkItem>,
4818    /// The "low priority" work queue for this store's event loop.
4819    low_priority: VecDeque<WorkItem>,
4820    /// A place to stash the reason a fiber is suspending so that the code which
4821    /// resumed it will know under what conditions the fiber should be resumed
4822    /// again.
4823    suspend_reason: Option<SuspendReason>,
4824    /// A cached fiber which is waiting for work to do.
4825    ///
4826    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4827    worker: Option<StoreFiber<'static>>,
4828    /// A place to stash the work item for which we're resuming a worker fiber.
4829    worker_item: Option<WorkerItem>,
4830
4831    /// Reference counts for all component error contexts
4832    ///
4833    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4834    /// (sub)component ref counts as tracked by `error_context_tables`, for
4835    /// example when the host holds one or more references to error contexts.
4836    ///
4837    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4838    /// component-wide representation) of the index into concurrent state for a given
4839    /// stored `ErrorContext`.
4840    ///
4841    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4842    /// as a `TableId<ErrorContextState>`.
4843    global_error_context_ref_counts:
4844        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4845}
4846
4847impl Default for ConcurrentState {
4848    fn default() -> Self {
4849        Self {
4850            current_thread: CurrentThread::None,
4851            table: AlwaysMut::new(ResourceTable::new()),
4852            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4853            high_priority: Vec::new(),
4854            low_priority: VecDeque::new(),
4855            suspend_reason: None,
4856            worker: None,
4857            worker_item: None,
4858            global_error_context_ref_counts: BTreeMap::new(),
4859        }
4860    }
4861}
4862
4863impl ConcurrentState {
4864    /// Take ownership of any fibers and futures owned by this object.
4865    ///
4866    /// This should be used when disposing of the `Store` containing this object
4867    /// in order to gracefully resolve any and all fibers using
4868    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4869    /// use-after-free bugs due to fibers which may still have access to the
4870    /// `Store`.
4871    ///
4872    /// Additionally, the futures collected with this function should be dropped
4873    /// within a `tls::set` call, which will ensure than any futures closing
4874    /// over an `&Accessor` will have access to the store when dropped, allowing
4875    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4876    /// panicking.
4877    ///
4878    /// Note that this will leave the object in an inconsistent and unusable
4879    /// state, so it should only be used just prior to dropping it.
4880    pub(crate) fn take_fibers_and_futures(
4881        &mut self,
4882        fibers: &mut Vec<StoreFiber<'static>>,
4883        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4884    ) {
4885        for entry in self.table.get_mut().iter_mut() {
4886            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4887                for mode in mem::take(&mut set.waiting).into_values() {
4888                    if let WaitMode::Fiber(fiber) = mode {
4889                        fibers.push(fiber);
4890                    }
4891                }
4892            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4893                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4894                    mem::replace(&mut thread.state, GuestThreadState::Completed)
4895                {
4896                    fibers.push(fiber);
4897                }
4898            }
4899        }
4900
4901        if let Some(fiber) = self.worker.take() {
4902            fibers.push(fiber);
4903        }
4904
4905        let mut handle_item = |item| match item {
4906            WorkItem::ResumeFiber(fiber) => {
4907                fibers.push(fiber);
4908            }
4909            WorkItem::PushFuture(future) => {
4910                self.futures
4911                    .get_mut()
4912                    .as_mut()
4913                    .unwrap()
4914                    .push(future.into_inner());
4915            }
4916            _ => {}
4917        };
4918
4919        for item in mem::take(&mut self.high_priority) {
4920            handle_item(item);
4921        }
4922        for item in mem::take(&mut self.low_priority) {
4923            handle_item(item);
4924        }
4925
4926        if let Some(them) = self.futures.get_mut().take() {
4927            futures.push(them);
4928        }
4929    }
4930
4931    /// Collect the next set of work items to run. This will be either all
4932    /// high-priority items, or a single low-priority item if there are no
4933    /// high-priority items.
4934    fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4935        let mut ready = mem::take(&mut self.high_priority);
4936        if ready.is_empty() {
4937            if let Some(item) = self.low_priority.pop_back() {
4938                ready.push(item);
4939            }
4940        }
4941        ready
4942    }
4943
4944    fn push<V: Send + Sync + 'static>(
4945        &mut self,
4946        value: V,
4947    ) -> Result<TableId<V>, ResourceTableError> {
4948        self.table.get_mut().push(value).map(TableId::from)
4949    }
4950
4951    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4952        self.table.get_mut().get_mut(&Resource::from(id))
4953    }
4954
4955    pub fn add_child<T: 'static, U: 'static>(
4956        &mut self,
4957        child: TableId<T>,
4958        parent: TableId<U>,
4959    ) -> Result<(), ResourceTableError> {
4960        self.table
4961            .get_mut()
4962            .add_child(Resource::from(child), Resource::from(parent))
4963    }
4964
4965    pub fn remove_child<T: 'static, U: 'static>(
4966        &mut self,
4967        child: TableId<T>,
4968        parent: TableId<U>,
4969    ) -> Result<(), ResourceTableError> {
4970        self.table
4971            .get_mut()
4972            .remove_child(Resource::from(child), Resource::from(parent))
4973    }
4974
4975    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4976        self.table.get_mut().delete(Resource::from(id))
4977    }
4978
4979    fn push_future(&mut self, future: HostTaskFuture) {
4980        // Note that we can't directly push to `ConcurrentState::futures` here
4981        // since this may be called from a future that's being polled inside
4982        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4983        // so it has exclusive access while polling it.  Therefore, we push a
4984        // work item to the "high priority" queue, which will actually push to
4985        // `ConcurrentState::futures` later.
4986        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4987    }
4988
4989    fn push_high_priority(&mut self, item: WorkItem) {
4990        log::trace!("push high priority: {item:?}");
4991        self.high_priority.push(item);
4992    }
4993
4994    fn push_low_priority(&mut self, item: WorkItem) {
4995        log::trace!("push low priority: {item:?}");
4996        self.low_priority.push_front(item);
4997    }
4998
4999    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5000        if high_priority {
5001            self.push_high_priority(item);
5002        } else {
5003            self.push_low_priority(item);
5004        }
5005    }
5006
5007    fn promote_instance_local_thread_work_item(
5008        &mut self,
5009        current_instance: RuntimeComponentInstanceIndex,
5010    ) -> bool {
5011        self.promote_work_items_matching(|item: &WorkItem| match item {
5012            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5013                *instance == current_instance
5014            }
5015            _ => false,
5016        })
5017    }
5018
5019    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5020        self.promote_work_items_matching(|item: &WorkItem| match item {
5021            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5022                *t == thread
5023            }
5024            _ => false,
5025        })
5026    }
5027
5028    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5029    where
5030        F: FnMut(&WorkItem) -> bool,
5031    {
5032        // If there's a high-priority work item to resume the current guest thread,
5033        // we don't need to promote anything, but we return true to indicate that
5034        // work is pending for the current instance.
5035        if self.high_priority.iter().any(&mut predicate) {
5036            true
5037        }
5038        // Otherwise, look for a low-priority work item that matches the current
5039        // instance and promote it to high-priority.
5040        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5041            let item = self.low_priority.remove(idx).unwrap();
5042            self.push_high_priority(item);
5043            true
5044        } else {
5045            false
5046        }
5047    }
5048
5049    /// Implements the `context.get` intrinsic.
5050    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5051        let thread = self.unwrap_current_guest_thread();
5052        let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5053        log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5054        Ok(val)
5055    }
5056
5057    /// Implements the `context.set` intrinsic.
5058    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5059        let thread = self.unwrap_current_guest_thread();
5060        log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5061        self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5062        Ok(())
5063    }
5064
5065    /// Returns whether there's a pending cancellation on the current guest thread,
5066    /// consuming the event if so.
5067    fn take_pending_cancellation(&mut self) -> bool {
5068        let thread = self.unwrap_current_guest_thread();
5069        if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5070            assert!(matches!(event, Event::Cancelled));
5071            true
5072        } else {
5073            false
5074        }
5075    }
5076
5077    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5078        if self.may_block(task) {
5079            Ok(())
5080        } else {
5081            Err(Trap::CannotBlockSyncTask.into())
5082        }
5083    }
5084
5085    fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5086        let task = self.get_mut(task).unwrap();
5087        task.async_function || task.returned_or_cancelled()
5088    }
5089
5090    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5091    /// specified task.
5092    ///
5093    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5094    /// below.
5095    pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5096        let (task, is_host) = (task >> 1, task & 1 == 1);
5097        if is_host {
5098            let task: TableId<HostTask> = TableId::new(task);
5099            &mut self.get_mut(task).unwrap().call_context
5100        } else {
5101            let task: TableId<GuestTask> = TableId::new(task);
5102            &mut self.get_mut(task).unwrap().call_context
5103        }
5104    }
5105
5106    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5107    /// in the future.
5108    pub fn current_call_context_scope_id(&self) -> u32 {
5109        let (bits, is_host) = match self.current_thread {
5110            CurrentThread::Guest(id) => (id.task.rep(), false),
5111            CurrentThread::Host(id) => (id.rep(), true),
5112            CurrentThread::None => unreachable!(),
5113        };
5114        assert_eq!((bits << 1) >> 1, bits);
5115        (bits << 1) | u32::from(is_host)
5116    }
5117
5118    fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5119        *self.current_thread.guest().unwrap()
5120    }
5121
5122    fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5123        self.current_thread.host().unwrap()
5124    }
5125}
5126
5127/// Provide a type hint to compiler about the shape of a parameter lower
5128/// closure.
5129fn for_any_lower<
5130    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5131>(
5132    fun: F,
5133) -> F {
5134    fun
5135}
5136
5137/// Provide a type hint to compiler about the shape of a result lift closure.
5138fn for_any_lift<
5139    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5140>(
5141    fun: F,
5142) -> F {
5143    fun
5144}
5145
5146/// Wrap the specified future in a `poll_fn` which asserts that the future is
5147/// only polled from the event loop of the specified `Store`.
5148///
5149/// See `StoreContextMut::run_concurrent` for details.
5150fn checked<F: Future + Send + 'static>(
5151    id: StoreId,
5152    fut: F,
5153) -> impl Future<Output = F::Output> + Send + 'static {
5154    async move {
5155        let mut fut = pin!(fut);
5156        future::poll_fn(move |cx| {
5157            let message = "\
5158                `Future`s which depend on asynchronous component tasks, streams, or \
5159                futures to complete may only be polled from the event loop of the \
5160                store to which they belong.  Please use \
5161                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5162            ";
5163            tls::try_get(|store| {
5164                let matched = match store {
5165                    tls::TryGet::Some(store) => store.id() == id,
5166                    tls::TryGet::Taken | tls::TryGet::None => false,
5167                };
5168
5169                if !matched {
5170                    panic!("{message}")
5171                }
5172            });
5173            fut.as_mut().poll(cx)
5174        })
5175        .await
5176    }
5177}
5178
5179/// Assert that `StoreContextMut::run_concurrent` has not been called from
5180/// within an store's event loop.
5181fn check_recursive_run() {
5182    tls::try_get(|store| {
5183        if !matches!(store, tls::TryGet::None) {
5184            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5185        }
5186    });
5187}
5188
5189fn unpack_callback_code(code: u32) -> (u32, u32) {
5190    (code & 0xF, code >> 4)
5191}
5192
5193/// Helper struct for packaging parameters to be passed to
5194/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5195/// `waitable-set.poll`.
5196struct WaitableCheckParams {
5197    set: TableId<WaitableSet>,
5198    options: OptionsIndex,
5199    payload: u32,
5200}
5201
5202/// Indicates whether `ComponentInstance::waitable_check` is being called for
5203/// `waitable-set.wait` or `waitable-set.poll`.
5204enum WaitableCheck {
5205    Wait,
5206    Poll,
5207}
5208
5209/// Represents a guest task called from the host, prepared using `prepare_call`.
5210pub(crate) struct PreparedCall<R> {
5211    /// The guest export to be called
5212    handle: Func,
5213    /// The guest thread created by `prepare_call`
5214    thread: QualifiedThreadId,
5215    /// The number of lowered core Wasm parameters to pass to the call.
5216    param_count: usize,
5217    /// The `oneshot::Receiver` to which the result of the call will be
5218    /// delivered when it is available.
5219    rx: oneshot::Receiver<LiftedResult>,
5220    _phantom: PhantomData<R>,
5221}
5222
5223impl<R> PreparedCall<R> {
5224    /// Get a copy of the `TaskId` for this `PreparedCall`.
5225    pub(crate) fn task_id(&self) -> TaskId {
5226        TaskId {
5227            task: self.thread.task,
5228        }
5229    }
5230}
5231
5232/// Represents a task created by `prepare_call`.
5233pub(crate) struct TaskId {
5234    task: TableId<GuestTask>,
5235}
5236
5237impl TaskId {
5238    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5239    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5240    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5241    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5242    /// and delete the task when all threads are done.
5243    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5244        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5245        if !task.already_lowered_parameters() {
5246            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5247        } else {
5248            task.host_future_state = HostFutureState::Dropped;
5249            if task.ready_to_delete() {
5250                Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5251            }
5252        }
5253        Ok(())
5254    }
5255}
5256
5257/// Prepare a call to the specified exported Wasm function, providing functions
5258/// for lowering the parameters and lifting the result.
5259///
5260/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5261/// loop, use `queue_call`.
5262pub(crate) fn prepare_call<T, R>(
5263    mut store: StoreContextMut<T>,
5264    handle: Func,
5265    param_count: usize,
5266    host_future_present: bool,
5267    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5268    + Send
5269    + Sync
5270    + 'static,
5271    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5272    + Send
5273    + Sync
5274    + 'static,
5275) -> Result<PreparedCall<R>> {
5276    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5277
5278    let instance = handle.instance().id().get(store.0);
5279    let options = &instance.component().env_component().options[options];
5280    let ty = &instance.component().types()[ty];
5281    let async_function = ty.async_;
5282    let task_return_type = ty.results;
5283    let component_instance = raw_options.instance;
5284    let callback = options.callback.map(|i| instance.runtime_callback(i));
5285    let memory = options
5286        .memory()
5287        .map(|i| instance.runtime_memory(i))
5288        .map(SendSyncPtr::new);
5289    let string_encoding = options.string_encoding;
5290    let token = StoreToken::new(store.as_context_mut());
5291    let state = store.0.concurrent_state_mut();
5292
5293    let (tx, rx) = oneshot::channel();
5294
5295    let instance = RuntimeInstance {
5296        instance: handle.instance().id().instance(),
5297        index: component_instance,
5298    };
5299    let caller = state.current_thread;
5300    let task = GuestTask::new(
5301        state,
5302        Box::new(for_any_lower(move |store, params| {
5303            lower_params(handle, token.as_context_mut(store), params)
5304        })),
5305        LiftResult {
5306            lift: Box::new(for_any_lift(move |store, result| {
5307                lift_result(handle, store, result)
5308            })),
5309            ty: task_return_type,
5310            memory,
5311            string_encoding,
5312        },
5313        Caller::Host {
5314            tx: Some(tx),
5315            host_future_present,
5316            caller,
5317        },
5318        callback.map(|callback| {
5319            let callback = SendSyncPtr::new(callback);
5320            let instance = handle.instance();
5321            Box::new(move |store: &mut dyn VMStore, event, handle| {
5322                let store = token.as_context_mut(store);
5323                // SAFETY: Per the contract of `prepare_call`, the callback
5324                // will remain valid at least as long is this task exists.
5325                unsafe { instance.call_callback(store, callback, event, handle) }
5326            }) as CallbackFn
5327        }),
5328        instance,
5329        async_function,
5330    )?;
5331
5332    let task = state.push(task)?;
5333    let thread = state.push(GuestThread::new_implicit(task))?;
5334    state.get_mut(task)?.threads.insert(thread);
5335
5336    if !store.0.may_enter(instance) {
5337        bail!(crate::Trap::CannotEnterComponent);
5338    }
5339
5340    Ok(PreparedCall {
5341        handle,
5342        thread: QualifiedThreadId { task, thread },
5343        param_count,
5344        rx,
5345        _phantom: PhantomData,
5346    })
5347}
5348
5349/// Queue a call previously prepared using `prepare_call` to be run as part of
5350/// the associated `ComponentInstance`'s event loop.
5351///
5352/// The returned future will resolve to the result once it is available, but
5353/// must only be polled via the instance's event loop. See
5354/// `StoreContextMut::run_concurrent` for details.
5355pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5356    mut store: StoreContextMut<T>,
5357    prepared: PreparedCall<R>,
5358) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5359    let PreparedCall {
5360        handle,
5361        thread,
5362        param_count,
5363        rx,
5364        ..
5365    } = prepared;
5366
5367    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5368
5369    Ok(checked(
5370        store.0.id(),
5371        rx.map(move |result| {
5372            result
5373                .map(|v| *v.downcast().unwrap())
5374                .map_err(crate::Error::from)
5375        }),
5376    ))
5377}
5378
5379/// Queue a call previously prepared using `prepare_call` to be run as part of
5380/// the associated `ComponentInstance`'s event loop.
5381fn queue_call0<T: 'static>(
5382    store: StoreContextMut<T>,
5383    handle: Func,
5384    guest_thread: QualifiedThreadId,
5385    param_count: usize,
5386) -> Result<()> {
5387    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5388    let is_concurrent = raw_options.async_;
5389    let callback = raw_options.callback;
5390    let instance = handle.instance();
5391    let callee = handle.lifted_core_func(store.0);
5392    let post_return = handle.post_return_core_func(store.0);
5393    let callback = callback.map(|i| {
5394        let instance = instance.id().get(store.0);
5395        SendSyncPtr::new(instance.runtime_callback(i))
5396    });
5397
5398    log::trace!("queueing call {guest_thread:?}");
5399
5400    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5401    // (with signatures appropriate for this call) and will remain valid as
5402    // long as this instance is valid.
5403    unsafe {
5404        instance.queue_call(
5405            store,
5406            guest_thread,
5407            SendSyncPtr::new(callee),
5408            param_count,
5409            1,
5410            is_concurrent,
5411            callback,
5412            post_return.map(SendSyncPtr::new),
5413        )
5414    }
5415}