Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::bail_bug;
54use crate::component::func::{self, Func, call_post_return};
55use crate::component::{
56    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57};
58use crate::fiber::{self, StoreFiber, StoreFiberYield};
59use crate::prelude::*;
60use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63use crate::{
64    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65};
66use error_contexts::GlobalErrorContextRefCount;
67use futures::channel::oneshot;
68use futures::future::{self, FutureExt};
69use futures::stream::{FuturesUnordered, StreamExt};
70use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71use std::any::Any;
72use std::borrow::ToOwned;
73use std::boxed::Box;
74use std::cell::UnsafeCell;
75use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76use std::fmt;
77use std::future::Future;
78use std::marker::PhantomData;
79use std::mem::{self, ManuallyDrop, MaybeUninit};
80use std::ops::DerefMut;
81use std::pin::{Pin, pin};
82use std::ptr::{self, NonNull};
83use std::task::{Context, Poll, Waker};
84use std::vec::Vec;
85use table::{TableDebug, TableId};
86use wasmtime_environ::component::{
87    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
88    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
89    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
90    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
91    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
92};
93use wasmtime_environ::packed_option::ReservedValue;
94use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
95
96pub use abort::JoinHandle;
97pub use future_stream_any::{FutureAny, StreamAny};
98pub use futures_and_streams::{
99    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102};
103pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105mod abort;
106mod error_contexts;
107mod future_stream_any;
108mod futures_and_streams;
109pub(crate) mod table;
110pub(crate) mod tls;
111
112/// Constant defined in the Component Model spec to indicate that the async
113/// intrinsic (e.g. `future.write`) has not yet completed.
114const BLOCKED: u32 = 0xffff_ffff;
115
116/// Corresponds to `CallState` in the upstream spec.
117#[derive(Clone, Copy, Eq, PartialEq, Debug)]
118pub enum Status {
119    Starting = 0,
120    Started = 1,
121    Returned = 2,
122    StartCancelled = 3,
123    ReturnCancelled = 4,
124}
125
126impl Status {
127    /// Packs this status and the optional `waitable` provided into a 32-bit
128    /// result that the canonical ABI requires.
129    ///
130    /// The low 4 bits are reserved for the status while the upper 28 bits are
131    /// the waitable, if present.
132    pub fn pack(self, waitable: Option<u32>) -> u32 {
133        assert!(matches!(self, Status::Returned) == waitable.is_none());
134        let waitable = waitable.unwrap_or(0);
135        assert!(waitable < (1 << 28));
136        (waitable << 4) | (self as u32)
137    }
138}
139
140/// Corresponds to `EventCode` in the Component Model spec, plus related payload
141/// data.
142#[derive(Clone, Copy, Debug)]
143enum Event {
144    None,
145    Subtask {
146        status: Status,
147    },
148    StreamRead {
149        code: ReturnCode,
150        pending: Option<(TypeStreamTableIndex, u32)>,
151    },
152    StreamWrite {
153        code: ReturnCode,
154        pending: Option<(TypeStreamTableIndex, u32)>,
155    },
156    FutureRead {
157        code: ReturnCode,
158        pending: Option<(TypeFutureTableIndex, u32)>,
159    },
160    FutureWrite {
161        code: ReturnCode,
162        pending: Option<(TypeFutureTableIndex, u32)>,
163    },
164    Cancelled,
165}
166
167impl Event {
168    /// Lower this event to core Wasm integers for delivery to the guest.
169    ///
170    /// Note that the waitable handle, if any, is assumed to be lowered
171    /// separately.
172    fn parts(self) -> (u32, u32) {
173        const EVENT_NONE: u32 = 0;
174        const EVENT_SUBTASK: u32 = 1;
175        const EVENT_STREAM_READ: u32 = 2;
176        const EVENT_STREAM_WRITE: u32 = 3;
177        const EVENT_FUTURE_READ: u32 = 4;
178        const EVENT_FUTURE_WRITE: u32 = 5;
179        const EVENT_CANCELLED: u32 = 6;
180        match self {
181            Event::None => (EVENT_NONE, 0),
182            Event::Cancelled => (EVENT_CANCELLED, 0),
183            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188        }
189    }
190}
191
192/// Corresponds to `CallbackCode` in the spec.
193mod callback_code {
194    pub const EXIT: u32 = 0;
195    pub const YIELD: u32 = 1;
196    pub const WAIT: u32 = 2;
197}
198
199/// A flag indicating that the callee is an async-lowered export.
200///
201/// This may be passed to the `async-start` intrinsic from a fused adapter.
202const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204/// Provides access to either store data (via the `get` method) or the store
205/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206/// instance to which the current host task belongs.
207///
208/// See [`Accessor::with`] for details.
209pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210    store: StoreContextMut<'a, T>,
211    get_data: fn(&mut T) -> D::Data<'_>,
212}
213
214impl<'a, T, D> Access<'a, T, D>
215where
216    D: HasData + ?Sized,
217    T: 'static,
218{
219    /// Creates a new [`Access`] from its component parts.
220    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221        Self { store, get_data }
222    }
223
224    /// Get mutable access to the store data.
225    pub fn data_mut(&mut self) -> &mut T {
226        self.store.data_mut()
227    }
228
229    /// Get mutable access to the store data.
230    pub fn get(&mut self) -> D::Data<'_> {
231        (self.get_data)(self.data_mut())
232    }
233
234    /// Spawn a background task.
235    ///
236    /// See [`Accessor::spawn`] for details.
237    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238    where
239        T: 'static,
240    {
241        let accessor = Accessor {
242            get_data: self.get_data,
243            token: StoreToken::new(self.store.as_context_mut()),
244        };
245        self.store
246            .as_context_mut()
247            .spawn_with_accessor(accessor, task)
248    }
249
250    /// Returns the getter this accessor is using to project from `T` into
251    /// `D::Data`.
252    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253        self.get_data
254    }
255}
256
257impl<'a, T, D> AsContext for Access<'a, T, D>
258where
259    D: HasData + ?Sized,
260    T: 'static,
261{
262    type Data = T;
263
264    fn as_context(&self) -> StoreContext<'_, T> {
265        self.store.as_context()
266    }
267}
268
269impl<'a, T, D> AsContextMut for Access<'a, T, D>
270where
271    D: HasData + ?Sized,
272    T: 'static,
273{
274    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275        self.store.as_context_mut()
276    }
277}
278
279/// Provides scoped mutable access to store data in the context of a concurrent
280/// host task future.
281///
282/// This allows multiple host task futures to execute concurrently and access
283/// the store between (but not across) `await` points.
284///
285/// # Rationale
286///
287/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288/// `D::Data<'_>`. The problem this is solving, however, is that it does not
289/// literally store these values. The basic problem is that when a concurrent
290/// host future is being polled it has access to `&mut T` (and the whole
291/// `Store`) but when it's not being polled it does not have access to these
292/// values. This reflects how the store is only ever polling one future at a
293/// time so the store is effectively being passed between futures.
294///
295/// Rust's `Future` trait, however, has no means of passing a `Store`
296/// temporarily between futures. The [`Context`](std::task::Context) type does
297/// not have the ability to attach arbitrary information to it at this time.
298/// This type, [`Accessor`], is used to bridge this expressivity gap.
299///
300/// The [`Accessor`] type here represents the ability to acquire, temporarily in
301/// a synchronous manner, the current store. The [`Accessor::with`] function
302/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304/// not take an `async` closure as its argument, instead it's a synchronous
305/// closure which must complete during on run of `Future::poll`. This reflects
306/// how the store is temporarily made available while a host future is being
307/// polled.
308///
309/// # Implementation
310///
311/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312/// this type additionally doesn't even have a lifetime parameter. This is
313/// instead a representation of proof of the ability to acquire these while a
314/// future is being polled. Wasmtime will, when it polls a host future,
315/// configure ambient state such that the `Accessor` that a future closes over
316/// will work and be able to access the store.
317///
318/// This has a number of implications for users such as:
319///
320/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321///   the lifetime of a single future.
322/// * A future is expected to, however, close over an `Accessor` and keep it
323///   alive probably for the duration of the entire future.
324/// * Different host futures will be given different `Accessor`s, and that's
325///   intentional.
326/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327///   alleviates some otherwise required bounds to be written down.
328///
329/// # Using `Accessor` in `Drop`
330///
331/// The methods on `Accessor` are only expected to work in the context of
332/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333/// host future can be dropped at any time throughout the system and Wasmtime
334/// store context is not necessarily available at that time. It's recommended to
335/// not use `Accessor` methods in anything connected to a `Drop` implementation
336/// as they will panic and have unintended results. If you run into this though
337/// feel free to file an issue on the Wasmtime repository.
338pub struct Accessor<T: 'static, D = HasSelf<T>>
339where
340    D: HasData + ?Sized,
341{
342    token: StoreToken<T>,
343    get_data: fn(&mut T) -> D::Data<'_>,
344}
345
346/// A helper trait to take any type of accessor-with-data in functions.
347///
348/// This trait is similar to [`AsContextMut`] except that it's used when
349/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350/// [`Accessor`] is the main type used in concurrent settings and is passed to
351/// functions such as [`Func::call_concurrent`].
352///
353/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354/// this trait. This effectively means that regardless of the `D` in
355/// `Accessor<T, D>` it can still be passed to a function which just needs a
356/// store accessor.
357///
358/// Acquiring an [`Accessor`] can be done through
359/// [`StoreContextMut::run_concurrent`] for example or in a host function
360/// through
361/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362pub trait AsAccessor {
363    /// The `T` in `Store<T>` that this accessor refers to.
364    type Data: 'static;
365
366    /// The `D` in `Accessor<T, D>`, or the projection out of
367    /// `Self::Data`.
368    type AccessorData: HasData + ?Sized;
369
370    /// Returns the accessor that this is referring to.
371    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372}
373
374impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375    type Data = T::Data;
376    type AccessorData = T::AccessorData;
377
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379        T::as_accessor(self)
380    }
381}
382
383impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384    type Data = T;
385    type AccessorData = D;
386
387    fn as_accessor(&self) -> &Accessor<T, D> {
388        self
389    }
390}
391
392// Note that it is intentional at this time that `Accessor` does not actually
393// store `&mut T` or anything similar. This distinctly enables the `Accessor`
394// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395// that matter). This is used to ergonomically simplify bindings where the
396// majority of the time `Accessor` is closed over in a future which then needs
397// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398// you already have to write `T: 'static`...) it helps to avoid this.
399//
400// Note as well that `Accessor` doesn't actually store its data at all. Instead
401// it's more of a "proof" of what can be accessed from TLS. API design around
402// `Accessor` and functions like `Linker::func_wrap_concurrent` are
403// intentionally made to ensure that `Accessor` is ideally only used in the
404// context that TLS variables are actually set. For example host functions are
405// given `&Accessor`, not `Accessor`, and this prevents them from persisting
406// the value outside of a future. Within the future the TLS variables are all
407// guaranteed to be set while the future is being polled.
408//
409// Finally though this is not an ironclad guarantee, but nor does it need to be.
410// The TLS APIs are designed to panic or otherwise model usage where they're
411// called recursively or similar. It's hoped that code cannot be constructed to
412// actually hit this at runtime but this is not a safety requirement at this
413// time.
414const _: () = {
415    const fn assert<T: Send + Sync>() {}
416    assert::<Accessor<UnsafeCell<u32>>>();
417};
418
419impl<T> Accessor<T> {
420    /// Creates a new `Accessor` backed by the specified functions.
421    ///
422    /// - `get`: used to retrieve the store
423    ///
424    /// - `get_data`: used to "project" from the store's associated data to
425    /// another type (e.g. a field of that data or a wrapper around it).
426    ///
427    /// - `spawn`: used to queue spawned background tasks to be run later
428    pub(crate) fn new(token: StoreToken<T>) -> Self {
429        Self {
430            token,
431            get_data: |x| x,
432        }
433    }
434}
435
436impl<T, D> Accessor<T, D>
437where
438    D: HasData + ?Sized,
439{
440    /// Run the specified closure, passing it mutable access to the store.
441    ///
442    /// This function is one of the main building blocks of the [`Accessor`]
443    /// type. This yields synchronous, blocking, access to the store via an
444    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445    /// providing the ability to access `D` via [`Access::get`]. Note that the
446    /// `fun` here is given only temporary access to the store and `T`/`D`
447    /// meaning that the return value `R` here is not allowed to capture borrows
448    /// into the two. If access is needed to data within `T` or `D` outside of
449    /// this closure then it must be `clone`d out, for example.
450    ///
451    /// # Panics
452    ///
453    /// This function will panic if it is call recursively with any other
454    /// accessor already in scope. For example if `with` is called within `fun`,
455    /// then this function will panic. It is up to the embedder to ensure that
456    /// this does not happen.
457    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458        tls::get(|vmstore| {
459            fun(Access {
460                store: self.token.as_context_mut(vmstore),
461                get_data: self.get_data,
462            })
463        })
464    }
465
466    /// Returns the getter this accessor is using to project from `T` into
467    /// `D::Data`.
468    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469        self.get_data
470    }
471
472    /// Changes this accessor to access `D2` instead of the current type
473    /// parameter `D`.
474    ///
475    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476    ///
477    /// # Panics
478    ///
479    /// When using this API the returned value is disconnected from `&self` and
480    /// the lifetime binding the `self` argument. An `Accessor` only works
481    /// within the context of the closure or async closure that it was
482    /// originally given to, however. This means that due to the fact that the
483    /// returned value has no lifetime connection it's possible to use the
484    /// accessor outside of `&self`, the original accessor, and panic.
485    ///
486    /// The returned value should only be used within the scope of the original
487    /// `Accessor` that `self` refers to.
488    pub fn with_getter<D2: HasData>(
489        &self,
490        get_data: fn(&mut T) -> D2::Data<'_>,
491    ) -> Accessor<T, D2> {
492        Accessor {
493            token: self.token,
494            get_data,
495        }
496    }
497
498    /// Spawn a background task which will receive an `&Accessor<T, D>` and
499    /// run concurrently with any other tasks in progress for the current
500    /// store.
501    ///
502    /// This is particularly useful for host functions which return a `stream`
503    /// or `future` such that the code to write to the write end of that
504    /// `stream` or `future` must run after the function returns.
505    ///
506    /// The returned [`JoinHandle`] may be used to cancel the task.
507    ///
508    /// # Panics
509    ///
510    /// Panics if called within a closure provided to the [`Accessor::with`]
511    /// function. This can only be called outside an active invocation of
512    /// [`Accessor::with`].
513    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514    where
515        T: 'static,
516    {
517        let accessor = self.clone_for_spawn();
518        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519    }
520
521    fn clone_for_spawn(&self) -> Self {
522        Self {
523            token: self.token,
524            get_data: self.get_data,
525        }
526    }
527}
528
529/// Represents a task which may be provided to `Accessor::spawn`,
530/// `Accessor::forward`, or `StorecContextMut::spawn`.
531// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
532// option.
533//
534// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
535// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
536// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
537// Send + Sync + 'static` fails with a type mismatch error when we try to pass
538// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
539// best we can do for the time being.
540pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541where
542    D: HasData + ?Sized,
543{
544    /// Run the task.
545    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546}
547
548/// Represents parameter and result metadata for the caller side of a
549/// guest->guest call orchestrated by a fused adapter.
550enum CallerInfo {
551    /// Metadata for a call to an async-lowered import
552    Async {
553        params: Vec<ValRaw>,
554        has_result: bool,
555    },
556    /// Metadata for a call to an sync-lowered import
557    Sync {
558        params: Vec<ValRaw>,
559        result_count: u32,
560    },
561}
562
563/// Indicates how a guest task is waiting on a waitable set.
564enum WaitMode {
565    /// The guest task is waiting using `task.wait`
566    Fiber(StoreFiber<'static>),
567    /// The guest task is waiting via a callback declared as part of an
568    /// async-lifted export.
569    Callback(Instance),
570}
571
572/// Represents the reason a fiber is suspending itself.
573#[derive(Debug)]
574enum SuspendReason {
575    /// The fiber is waiting for an event to be delivered to the specified
576    /// waitable set or task.
577    Waiting {
578        set: TableId<WaitableSet>,
579        thread: QualifiedThreadId,
580        skip_may_block_check: bool,
581    },
582    /// The fiber has finished handling its most recent work item and is waiting
583    /// for another (or to be dropped if it is no longer needed).
584    NeedWork,
585    /// The fiber is yielding and should be resumed once other tasks have had a
586    /// chance to run.
587    Yielding {
588        thread: QualifiedThreadId,
589        skip_may_block_check: bool,
590    },
591    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
592    ExplicitlySuspending {
593        thread: QualifiedThreadId,
594        skip_may_block_check: bool,
595    },
596}
597
598/// Represents a pending call into guest code for a given guest task.
599enum GuestCallKind {
600    /// Indicates there's an event to deliver to the task, possibly related to a
601    /// waitable set the task has been waiting on or polling.
602    DeliverEvent {
603        /// The instance to which the task belongs.
604        instance: Instance,
605        /// The waitable set the event belongs to, if any.
606        ///
607        /// If this is `None` the event will be waiting in the
608        /// `GuestTask::event` field for the task.
609        set: Option<TableId<WaitableSet>>,
610    },
611    /// Indicates that a new guest task call is pending and may be executed
612    /// using the specified closure.
613    ///
614    /// If the closure returns `Ok(Some(call))`, the `call` should be run
615    /// immediately using `handle_guest_call`.
616    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618}
619
620impl fmt::Debug for GuestCallKind {
621    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622        match self {
623            Self::DeliverEvent { instance, set } => f
624                .debug_struct("DeliverEvent")
625                .field("instance", instance)
626                .field("set", set)
627                .finish(),
628            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630        }
631    }
632}
633
634/// The target of a suspension intrinsic.
635#[derive(Copy, Clone, Debug)]
636pub enum SuspensionTarget {
637    SomeSuspended(u32),
638    Some(u32),
639    None,
640}
641
642impl SuspensionTarget {
643    fn is_none(&self) -> bool {
644        matches!(self, SuspensionTarget::None)
645    }
646    fn is_some(&self) -> bool {
647        !self.is_none()
648    }
649}
650
651/// Represents a pending call into guest code for a given guest thread.
652#[derive(Debug)]
653struct GuestCall {
654    thread: QualifiedThreadId,
655    kind: GuestCallKind,
656}
657
658impl GuestCall {
659    /// Returns whether or not the call is ready to run.
660    ///
661    /// A call will not be ready to run if either:
662    ///
663    /// - the (sub-)component instance to be called has already been entered and
664    /// cannot be reentered until an in-progress call completes
665    ///
666    /// - the call is for a not-yet started task and the (sub-)component
667    /// instance to be called has backpressure enabled
668    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669        let instance = store
670            .concurrent_state_mut()
671            .get_mut(self.thread.task)?
672            .instance;
673        let state = store.instance_state(instance).concurrent_state();
674
675        let ready = match &self.kind {
676            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678            GuestCallKind::StartExplicit(_) => true,
679        };
680        log::trace!(
681            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682            state.do_not_enter,
683            state.backpressure
684        );
685        Ok(ready)
686    }
687}
688
689/// Job to be run on a worker fiber.
690enum WorkerItem {
691    GuestCall(GuestCall),
692    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693}
694
695/// Represents a pending work item to be handled by the event loop for a given
696/// component instance.
697enum WorkItem {
698    /// A host task to be pushed to `ConcurrentState::futures`.
699    PushFuture(AlwaysMut<HostTaskFuture>),
700    /// A fiber to resume.
701    ResumeFiber(StoreFiber<'static>),
702    /// A thread to resume.
703    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704    /// A pending call into guest code for a given guest task.
705    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706    /// A job to run on a worker fiber.
707    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708}
709
710impl fmt::Debug for WorkItem {
711    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712        match self {
713            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715            Self::ResumeThread(instance, thread) => f
716                .debug_tuple("ResumeThread")
717                .field(instance)
718                .field(thread)
719                .finish(),
720            Self::GuestCall(instance, call) => f
721                .debug_tuple("GuestCall")
722                .field(instance)
723                .field(call)
724                .finish(),
725            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726        }
727    }
728}
729
730/// Whether a suspension intrinsic was cancelled or completed
731#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732pub(crate) enum WaitResult {
733    Cancelled,
734    Completed,
735}
736
737/// Poll the specified future until it completes on behalf of a guest->host call
738/// using a sync-lowered import.
739///
740/// This is similar to `Instance::first_poll` except it's for sync-lowered
741/// imports, meaning we don't need to handle cancellation and we can block the
742/// caller until the task completes, at which point the caller can handle
743/// lowering the result to the guest's stack and linear memory.
744pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745    store: &mut dyn VMStore,
746    future: impl Future<Output = Result<R>> + Send + 'static,
747) -> Result<R> {
748    let state = store.concurrent_state_mut();
749    let task = state.current_host_thread()?;
750
751    // Wrap the future in a closure which will take care of stashing the result
752    // in `GuestTask::result` and resuming this fiber when the host task
753    // completes.
754    let mut future = Box::pin(async move {
755        let result = future.await?;
756        tls::get(move |store| {
757            let state = store.concurrent_state_mut();
758            let host_state = &mut state.get_mut(task)?.state;
759            assert!(matches!(host_state, HostTaskState::CalleeStarted));
760            *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762            Waitable::Host(task).set_event(
763                state,
764                Some(Event::Subtask {
765                    status: Status::Returned,
766                }),
767            )?;
768
769            Ok(())
770        })
771    }) as HostTaskFuture;
772
773    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
774    // add the future to `ConcurrentState::futures` and poll it automatically
775    // from the event loop if it doesn't complete immediately here.
776    let poll = tls::set(store, || {
777        future
778            .as_mut()
779            .poll(&mut Context::from_waker(&Waker::noop()))
780    });
781
782    match poll {
783        // It completed immediately; check the result and delete the task.
784        Poll::Ready(result) => result?,
785
786        // It did not complete immediately; add it to
787        // `ConcurrentState::futures` so it will be polled via the event loop;
788        // then use `GuestThread::sync_call_set` to wait for the task to
789        // complete, suspending the current fiber until it does so.
790        Poll::Pending => {
791            let state = store.concurrent_state_mut();
792            state.push_future(future);
793
794            let caller = state.get_mut(task)?.caller;
795            let set = state.get_mut(caller.thread)?.sync_call_set;
796            Waitable::Host(task).join(state, Some(set))?;
797
798            store.suspend(SuspendReason::Waiting {
799                set,
800                thread: caller,
801                skip_may_block_check: false,
802            })?;
803
804            // Remove the `task` from the `sync_call_set` to ensure that when
805            // this function returns and the task is deleted that there are no
806            // more lingering references to this host task.
807            Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808        }
809    }
810
811    // Retrieve and return the result.
812    let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815            Ok(result) => *result,
816            Err(_) => bail_bug!("host task finished with wrong type of result"),
817        }),
818        _ => bail_bug!("unexpected host task state after completion"),
819    }
820}
821
822/// Execute the specified guest call.
823fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824    let mut next = Some(call);
825    while let Some(call) = next.take() {
826        match call.kind {
827            GuestCallKind::DeliverEvent { instance, set } => {
828                let (event, waitable) =
829                    match instance.get_event(store, call.thread.task, set, true)? {
830                        Some(pair) => pair,
831                        None => bail_bug!("delivering non-present event"),
832                    };
833                let state = store.concurrent_state_mut();
834                let task = state.get_mut(call.thread.task)?;
835                let runtime_instance = task.instance;
836                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837
838                log::trace!(
839                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840                    call.thread,
841                );
842
843                let old_thread = store.set_thread(call.thread)?;
844                log::trace!(
845                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846                    call.thread
847                );
848
849                store.enter_instance(runtime_instance);
850
851                let Some(callback) = store
852                    .concurrent_state_mut()
853                    .get_mut(call.thread.task)?
854                    .callback
855                    .take()
856                else {
857                    bail_bug!("guest task callback field not present")
858                };
859
860                let code = callback(store, event, handle)?;
861
862                store
863                    .concurrent_state_mut()
864                    .get_mut(call.thread.task)?
865                    .callback = Some(callback);
866
867                store.exit_instance(runtime_instance)?;
868
869                store.set_thread(old_thread)?;
870
871                next = instance.handle_callback_code(
872                    store,
873                    call.thread,
874                    runtime_instance.index,
875                    code,
876                )?;
877
878                log::trace!(
879                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880                );
881            }
882            GuestCallKind::StartImplicit(fun) => {
883                next = fun(store)?;
884            }
885            GuestCallKind::StartExplicit(fun) => {
886                fun(store)?;
887            }
888        }
889    }
890
891    Ok(())
892}
893
894impl<T> Store<T> {
895    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
896    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897    where
898        T: Send + 'static,
899    {
900        ensure!(
901            self.as_context().0.concurrency_support(),
902            "cannot use `run_concurrent` when Config::concurrency_support disabled",
903        );
904        self.as_context_mut().run_concurrent(fun).await
905    }
906
907    #[doc(hidden)]
908    pub fn assert_concurrent_state_empty(&mut self) {
909        self.as_context_mut().assert_concurrent_state_empty();
910    }
911
912    #[doc(hidden)]
913    pub fn concurrent_state_table_size(&mut self) -> usize {
914        self.as_context_mut().concurrent_state_table_size()
915    }
916
917    /// Convenience wrapper for [`StoreContextMut::spawn`].
918    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919    where
920        T: 'static,
921    {
922        self.as_context_mut().spawn(task)
923    }
924}
925
926impl<T> StoreContextMut<'_, T> {
927    /// Assert that all the relevant tables and queues in the concurrent state
928    /// for this store are empty.
929    ///
930    /// This is for sanity checking in integration tests
931    /// (e.g. `component-async-tests`) that the relevant state has been cleared
932    /// after each test concludes.  This should help us catch leaks, e.g. guest
933    /// tasks which haven't been deleted despite having completed and having
934    /// been dropped by their supertasks.
935    ///
936    /// Only intended for use in Wasmtime's own testing.
937    #[doc(hidden)]
938    pub fn assert_concurrent_state_empty(self) {
939        let store = self.0;
940        store
941            .store_data_mut()
942            .components
943            .assert_instance_states_empty();
944        let state = store.concurrent_state_mut();
945        assert!(
946            state.table.get_mut().is_empty(),
947            "non-empty table: {:?}",
948            state.table.get_mut()
949        );
950        assert!(state.high_priority.is_empty());
951        assert!(state.low_priority.is_empty());
952        assert!(state.current_thread.is_none());
953        assert!(state.futures_mut().unwrap().is_empty());
954        assert!(state.global_error_context_ref_counts.is_empty());
955    }
956
957    /// Helper function to perform tests over the size of the concurrent state
958    /// table which can be useful for detecting leaks.
959    ///
960    /// Only intended for use in Wasmtime's own testing.
961    #[doc(hidden)]
962    pub fn concurrent_state_table_size(&mut self) -> usize {
963        self.0
964            .concurrent_state_mut()
965            .table
966            .get_mut()
967            .iter_mut()
968            .count()
969    }
970
971    /// Spawn a background task to run as part of this instance's event loop.
972    ///
973    /// The task will receive an `&Accessor<U>` and run concurrently with
974    /// any other tasks in progress for the instance.
975    ///
976    /// Note that the task will only make progress if and when the event loop
977    /// for this instance is run.
978    ///
979    /// The returned [`JoinHandle`] may be used to cancel the task.
980    pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981    where
982        T: 'static,
983    {
984        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985        self.spawn_with_accessor(accessor, task)
986    }
987
988    /// Internal implementation of `spawn` functions where a `store` is
989    /// available along with an `Accessor`.
990    fn spawn_with_accessor<D>(
991        self,
992        accessor: Accessor<T, D>,
993        task: impl AccessorTask<T, D>,
994    ) -> JoinHandle
995    where
996        T: 'static,
997        D: HasData + ?Sized,
998    {
999        // Create an "abortable future" here where internally the future will
1000        // hook calls to poll and possibly spawn more background tasks on each
1001        // iteration.
1002        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003        self.0
1004            .concurrent_state_mut()
1005            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006        handle
1007    }
1008
1009    /// Run the specified closure `fun` to completion as part of this store's
1010    /// event loop.
1011    ///
1012    /// This will run `fun` as part of this store's event loop until it
1013    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1014    /// controlled access to the store and its data.
1015    ///
1016    /// This function can be used to invoke [`Func::call_concurrent`] for
1017    /// example within the async closure provided here.
1018    ///
1019    /// This function will unconditionally return an error if
1020    /// [`Config::concurrency_support`] is disabled.
1021    ///
1022    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1023    ///
1024    /// # Store-blocking behavior
1025    ///
1026    /// At this time there are certain situations in which the `Future` returned
1027    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1028    /// extended period of time, despite one or more `Waker::wake` events having
1029    /// occurred for the task to which it belongs.  This can manifest as the
1030    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1031    /// the `Store` being held by e.g. a blocking host function, preventing the
1032    /// `Future` from being polled. A canonical example of this is when the
1033    /// `fun` provided to this function attempts to set a timeout for an
1034    /// invocation of a wasm function. In this situation the async closure is
1035    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1036    /// to elapse. At this time this setup will not always work and the timeout
1037    /// may not reliably fire.
1038    ///
1039    /// This function will not block the current thread and as such is always
1040    /// suitable to run in an `async` context, but the current implementation of
1041    /// Wasmtime can lead to situations where a certain wasm computation is
1042    /// required to make progress the closure to make progress. This is an
1043    /// artifact of Wasmtime's historical implementation of `async` functions
1044    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1045    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1046    /// progress for a readiness notification of (b) to get delivered.
1047    ///
1048    /// This effectively means that it's not possible to reliably perform a
1049    /// "select" operation within the `fun` closure, which timeouts for example
1050    /// are based on. Fixing this requires some relatively major refactoring
1051    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1052    /// that is intended to be fixed one day. In the meantime it's recommended
1053    /// to apply timeouts or such to the entire `run_concurrent` call itself
1054    /// rather than internally.
1055    ///
1056    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1057    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1058    ///
1059    /// # Example
1060    ///
1061    /// ```
1062    /// # use {
1063    /// #   wasmtime::{
1064    /// #     error::{Result},
1065    /// #     component::{ Component, Linker, Resource, ResourceTable},
1066    /// #     Config, Engine, Store
1067    /// #   },
1068    /// # };
1069    /// #
1070    /// # struct MyResource(u32);
1071    /// # struct Ctx { table: ResourceTable }
1072    /// #
1073    /// # async fn foo() -> Result<()> {
1074    /// # let mut config = Config::new();
1075    /// # let engine = Engine::new(&config)?;
1076    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1077    /// # let mut linker = Linker::new(&engine);
1078    /// # let component = Component::new(&engine, "")?;
1079    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1080    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1081    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1082    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1083    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1084    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1085    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1086    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1087    ///    Ok(())
1088    /// }).await??;
1089    /// # Ok(())
1090    /// # }
1091    /// ```
1092    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093    where
1094        T: Send + 'static,
1095    {
1096        ensure!(
1097            self.0.concurrency_support(),
1098            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099        );
1100        self.do_run_concurrent(fun, false).await
1101    }
1102
1103    pub(super) async fn run_concurrent_trap_on_idle<R>(
1104        self,
1105        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106    ) -> Result<R>
1107    where
1108        T: Send + 'static,
1109    {
1110        self.do_run_concurrent(fun, true).await
1111    }
1112
1113    async fn do_run_concurrent<R>(
1114        mut self,
1115        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116        trap_on_idle: bool,
1117    ) -> Result<R>
1118    where
1119        T: Send + 'static,
1120    {
1121        debug_assert!(self.0.concurrency_support());
1122        check_recursive_run();
1123        let token = StoreToken::new(self.as_context_mut());
1124
1125        struct Dropper<'a, T: 'static, V> {
1126            store: StoreContextMut<'a, T>,
1127            value: ManuallyDrop<V>,
1128        }
1129
1130        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131            fn drop(&mut self) {
1132                tls::set(self.store.0, || {
1133                    // SAFETY: Here we drop the value without moving it for the
1134                    // first and only time -- per the contract for `Drop::drop`,
1135                    // this code won't run again, and the `value` field will no
1136                    // longer be accessible.
1137                    unsafe { ManuallyDrop::drop(&mut self.value) }
1138                });
1139            }
1140        }
1141
1142        let accessor = &Accessor::new(token);
1143        let dropper = &mut Dropper {
1144            store: self,
1145            value: ManuallyDrop::new(fun(accessor)),
1146        };
1147        // SAFETY: We never move `dropper` nor its `value` field.
1148        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149
1150        dropper
1151            .store
1152            .as_context_mut()
1153            .poll_until(future, trap_on_idle)
1154            .await
1155    }
1156
1157    /// Run this store's event loop.
1158    ///
1159    /// The returned future will resolve when the specified future completes or,
1160    /// if `trap_on_idle` is true, when the event loop can't make further
1161    /// progress.
1162    async fn poll_until<R>(
1163        mut self,
1164        mut future: Pin<&mut impl Future<Output = R>>,
1165        trap_on_idle: bool,
1166    ) -> Result<R>
1167    where
1168        T: Send + 'static,
1169    {
1170        struct Reset<'a, T: 'static> {
1171            store: StoreContextMut<'a, T>,
1172            futures: Option<FuturesUnordered<HostTaskFuture>>,
1173        }
1174
1175        impl<'a, T> Drop for Reset<'a, T> {
1176            fn drop(&mut self) {
1177                if let Some(futures) = self.futures.take() {
1178                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179                }
1180            }
1181        }
1182
1183        loop {
1184            // Take `ConcurrentState::futures` out of the store so we can poll
1185            // it while also safely giving any of the futures inside access to
1186            // `self`.
1187            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188            let mut reset = Reset {
1189                store: self.as_context_mut(),
1190                futures,
1191            };
1192            let mut next = match reset.futures.as_mut() {
1193                Some(f) => pin!(f.next()),
1194                None => bail_bug!("concurrent state missing futures field"),
1195            };
1196
1197            enum PollResult<R> {
1198                Complete(R),
1199                ProcessWork {
1200                    ready: Vec<WorkItem>,
1201                    low_priority: bool,
1202                },
1203            }
1204
1205            let result = future::poll_fn(|cx| {
1206                // First, poll the future we were passed as an argument and
1207                // return immediately if it's ready.
1208                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1209                    return Poll::Ready(Ok(PollResult::Complete(value)));
1210                }
1211
1212                // Next, poll `ConcurrentState::futures` (which includes any
1213                // pending host tasks and/or background tasks), returning
1214                // immediately if one of them fails.
1215                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1216                    Poll::Ready(Some(output)) => {
1217                        match output {
1218                            Err(e) => return Poll::Ready(Err(e)),
1219                            Ok(()) => {}
1220                        }
1221                        Poll::Ready(true)
1222                    }
1223                    Poll::Ready(None) => Poll::Ready(false),
1224                    Poll::Pending => Poll::Pending,
1225                };
1226
1227                // Next, collect the next batch of work items to process, if
1228                // any.  This will be either all of the high-priority work
1229                // items, or if there are none, a single low-priority work item.
1230                let state = reset.store.0.concurrent_state_mut();
1231                let mut ready = mem::take(&mut state.high_priority);
1232                let mut low_priority = false;
1233                if ready.is_empty() {
1234                    if let Some(item) = state.low_priority.pop_back() {
1235                        ready.push(item);
1236                        low_priority = true;
1237                    }
1238                }
1239                if !ready.is_empty() {
1240                    return Poll::Ready(Ok(PollResult::ProcessWork {
1241                        ready,
1242                        low_priority,
1243                    }));
1244                }
1245
1246                // Finally, if we have nothing else to do right now, determine what to do
1247                // based on whether there are any pending futures in
1248                // `ConcurrentState::futures`.
1249                return match next {
1250                    Poll::Ready(true) => {
1251                        // In this case, one of the futures in
1252                        // `ConcurrentState::futures` completed
1253                        // successfully, so we return now and continue
1254                        // the outer loop in case there is another one
1255                        // ready to complete.
1256                        Poll::Ready(Ok(PollResult::ProcessWork {
1257                            ready: Vec::new(),
1258                            low_priority: false,
1259                        }))
1260                    }
1261                    Poll::Ready(false) => {
1262                        // Poll the future we were passed one last time
1263                        // in case one of `ConcurrentState::futures` had
1264                        // the side effect of unblocking it.
1265                        if let Poll::Ready(value) =
1266                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1267                        {
1268                            Poll::Ready(Ok(PollResult::Complete(value)))
1269                        } else {
1270                            // In this case, there are no more pending
1271                            // futures in `ConcurrentState::futures`,
1272                            // there are no remaining work items, _and_
1273                            // the future we were passed as an argument
1274                            // still hasn't completed.
1275                            if trap_on_idle {
1276                                // `trap_on_idle` is true, so we exit
1277                                // immediately.
1278                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1279                            } else {
1280                                // `trap_on_idle` is false, so we assume
1281                                // that future will wake up and give us
1282                                // more work to do when it's ready to.
1283                                Poll::Pending
1284                            }
1285                        }
1286                    }
1287                    // There is at least one pending future in
1288                    // `ConcurrentState::futures` and we have nothing
1289                    // else to do but wait for now, so we return
1290                    // `Pending`.
1291                    Poll::Pending => Poll::Pending,
1292                };
1293            })
1294            .await;
1295
1296            // Put the `ConcurrentState::futures` back into the store before we
1297            // return or handle any work items since one or more of those items
1298            // might append more futures.
1299            drop(reset);
1300
1301            match result? {
1302                // The future we were passed as an argument completed, so we
1303                // return the result.
1304                PollResult::Complete(value) => break Ok(value),
1305                // The future we were passed has not yet completed, so handle
1306                // any work items and then loop again.
1307                PollResult::ProcessWork {
1308                    ready,
1309                    low_priority,
1310                } => {
1311                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1312                        store: StoreContextMut<'a, T>,
1313                        ready: I,
1314                    }
1315
1316                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1317                        fn drop(&mut self) {
1318                            while let Some(item) = self.ready.next() {
1319                                match item {
1320                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1321                                    WorkItem::PushFuture(future) => {
1322                                        tls::set(self.store.0, move || drop(future))
1323                                    }
1324                                    _ => {}
1325                                }
1326                            }
1327                        }
1328                    }
1329
1330                    let mut dispose = Dispose {
1331                        store: self.as_context_mut(),
1332                        ready: ready.into_iter(),
1333                    };
1334
1335                    // If we're about to run a low-priority task, first yield to
1336                    // the executor.  This ensures that it won't be starved of
1337                    // the ability to e.g. update the readiness of sockets,
1338                    // etc. which the guest may be using `thread.yield` along
1339                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1340                    //
1341                    // This works for e.g. `thread.yield` and callbacks which
1342                    // return `CALLBACK_CODE_YIELD` because we queue a low
1343                    // priority item to resume the task (i.e. resume the thread
1344                    // or call the callback, respectively) just prior to
1345                    // suspending it.  Indeed, as of this writing those are the
1346                    // _only_ situations we queue low-priority tasks.
1347                    // Therefore, we interpret the guest's request to yield as
1348                    // meaning "yield to other guest tasks _and_/_or_ host
1349                    // operations such as updating socket readiness", the latter
1350                    // being the async runtime's responsibility.
1351                    //
1352                    // In the future, if this ends up causing measurable
1353                    // performance issues, this could be optimized such that we
1354                    // only yield periodically (e.g. for batches of low priority
1355                    // items) and not for each and every idividual item.
1356                    if low_priority {
1357                        dispose.store.0.yield_now().await
1358                    }
1359
1360                    while let Some(item) = dispose.ready.next() {
1361                        dispose
1362                            .store
1363                            .as_context_mut()
1364                            .handle_work_item(item)
1365                            .await?;
1366                    }
1367                }
1368            }
1369        }
1370    }
1371
1372    /// Handle the specified work item, possibly resuming a fiber if applicable.
1373    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1374    where
1375        T: Send,
1376    {
1377        log::trace!("handle work item {item:?}");
1378        match item {
1379            WorkItem::PushFuture(future) => {
1380                self.0
1381                    .concurrent_state_mut()
1382                    .futures_mut()?
1383                    .push(future.into_inner());
1384            }
1385            WorkItem::ResumeFiber(fiber) => {
1386                self.0.resume_fiber(fiber).await?;
1387            }
1388            WorkItem::ResumeThread(_, thread) => {
1389                if let GuestThreadState::Ready(fiber) = mem::replace(
1390                    &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1391                    GuestThreadState::Running,
1392                ) {
1393                    self.0.resume_fiber(fiber).await?;
1394                } else {
1395                    bail_bug!("cannot resume non-pending thread {thread:?}");
1396                }
1397            }
1398            WorkItem::GuestCall(_, call) => {
1399                if call.is_ready(self.0)? {
1400                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1401                } else {
1402                    let state = self.0.concurrent_state_mut();
1403                    let task = state.get_mut(call.thread.task)?;
1404                    if !task.starting_sent {
1405                        task.starting_sent = true;
1406                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1407                            Waitable::Guest(call.thread.task).set_event(
1408                                state,
1409                                Some(Event::Subtask {
1410                                    status: Status::Starting,
1411                                }),
1412                            )?;
1413                        }
1414                    }
1415
1416                    let instance = state.get_mut(call.thread.task)?.instance;
1417                    self.0
1418                        .instance_state(instance)
1419                        .concurrent_state()
1420                        .pending
1421                        .insert(call.thread, call.kind);
1422                }
1423            }
1424            WorkItem::WorkerFunction(fun) => {
1425                self.run_on_worker(WorkerItem::Function(fun)).await?;
1426            }
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// Execute the specified guest call on a worker fiber.
1433    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1434    where
1435        T: Send,
1436    {
1437        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1438            fiber
1439        } else {
1440            fiber::make_fiber(self.0, move |store| {
1441                loop {
1442                    let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1443                        bail_bug!("worker_item not present when resuming fiber")
1444                    };
1445                    match item {
1446                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1447                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1448                    }
1449
1450                    store.suspend(SuspendReason::NeedWork)?;
1451                }
1452            })?
1453        };
1454
1455        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1456        assert!(worker_item.is_none());
1457        *worker_item = Some(item);
1458
1459        self.0.resume_fiber(worker).await
1460    }
1461
1462    /// Wrap the specified host function in a future which will call it, passing
1463    /// it an `&Accessor<T>`.
1464    ///
1465    /// See the `Accessor` documentation for details.
1466    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1467    where
1468        T: 'static,
1469        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1470            + Send
1471            + Sync
1472            + 'static,
1473        R: Send + Sync + 'static,
1474    {
1475        let token = StoreToken::new(self);
1476        async move {
1477            let mut accessor = Accessor::new(token);
1478            closure(&mut accessor).await
1479        }
1480    }
1481}
1482
1483impl StoreOpaque {
1484    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1485    /// guest-to-guest call or a sync host-to-guest call.
1486    ///
1487    /// This task will only be used for the purpose of handling calls to
1488    /// intrinsic functions; both parameter lowering and result lifting are
1489    /// assumed to be taken care of elsewhere.
1490    pub(crate) fn enter_guest_sync_call(
1491        &mut self,
1492        guest_caller: Option<RuntimeInstance>,
1493        callee_async: bool,
1494        callee: RuntimeInstance,
1495    ) -> Result<()> {
1496        log::trace!("enter sync call {callee:?}");
1497        if !self.concurrency_support() {
1498            return self.enter_call_not_concurrent();
1499        }
1500
1501        let state = self.concurrent_state_mut();
1502        let thread = state.current_thread;
1503        let instance = if let Some(thread) = thread.guest() {
1504            Some(state.get_mut(thread.task)?.instance)
1505        } else {
1506            None
1507        };
1508        if guest_caller.is_some() {
1509            debug_assert_eq!(instance, guest_caller);
1510        }
1511        let task = GuestTask::new(
1512            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1513            LiftResult {
1514                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1515                ty: TypeTupleIndex::reserved_value(),
1516                memory: None,
1517                string_encoding: StringEncoding::Utf8,
1518            },
1519            if let Some(thread) = thread.guest() {
1520                Caller::Guest { thread: *thread }
1521            } else {
1522                Caller::Host {
1523                    tx: None,
1524                    host_future_present: false,
1525                    caller: thread,
1526                }
1527            },
1528            None,
1529            callee,
1530            callee_async,
1531        )?;
1532
1533        let guest_task = state.push(task)?;
1534        let new_thread = GuestThread::new_implicit(state, guest_task)?;
1535        let guest_thread = state.push(new_thread)?;
1536        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1537            guest_thread,
1538            self,
1539            callee.index,
1540        )?;
1541
1542        let state = self.concurrent_state_mut();
1543        state.get_mut(guest_task)?.threads.insert(guest_thread);
1544
1545        self.set_thread(QualifiedThreadId {
1546            task: guest_task,
1547            thread: guest_thread,
1548        })?;
1549
1550        Ok(())
1551    }
1552
1553    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1554    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1555        if !self.concurrency_support() {
1556            return Ok(self.exit_call_not_concurrent());
1557        }
1558        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1559            Some(t) => *t,
1560            None => bail_bug!("expected task when exiting"),
1561        };
1562        let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1563        log::trace!("exit sync call {instance:?}");
1564        Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1565            self,
1566            thread,
1567            instance.index,
1568        )?;
1569
1570        let state = self.concurrent_state_mut();
1571        let task = state.get_mut(thread.task)?;
1572        let caller = match &task.caller {
1573            &Caller::Guest { thread } => thread.into(),
1574            &Caller::Host { caller, .. } => caller,
1575        };
1576        self.set_thread(caller)?;
1577
1578        let state = self.concurrent_state_mut();
1579        let task = state.get_mut(thread.task)?;
1580        if task.ready_to_delete() {
1581            state.delete(thread.task)?;
1582        }
1583
1584        Ok(())
1585    }
1586
1587    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1588    /// transition to the host.
1589    ///
1590    /// FIXME: this is called for all guest->host transitions and performs some
1591    /// relatively expensive table manipulations. This would ideally be
1592    /// optimized to avoid the full allocation of a `HostTask` in at least some
1593    /// situations.
1594    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1595        if !self.concurrency_support() {
1596            self.enter_call_not_concurrent()?;
1597            return Ok(None);
1598        }
1599        let state = self.concurrent_state_mut();
1600        let caller = state.current_guest_thread()?;
1601        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1602        log::trace!("new host task {task:?}");
1603        self.set_thread(task)?;
1604        Ok(Some(task))
1605    }
1606
1607    /// Invoked before lowering the results of a host task to the guest.
1608    ///
1609    /// This is used to update the current thread annotations within the store
1610    /// to ensure that it reflects the guest task, not the host task, since
1611    /// lowering may execute guest code.
1612    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1613        if !self.concurrency_support() {
1614            return Ok(());
1615        }
1616        let task = self.concurrent_state_mut().current_host_thread()?;
1617        let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1618        self.set_thread(caller)?;
1619        Ok(())
1620    }
1621
1622    /// Dual of `host_task_create` and signifies that the host has finished and
1623    /// will be cleaned up.
1624    ///
1625    /// Note that this isn't invoked when the host is invoked asynchronously and
1626    /// the host isn't complete yet. In that situation the host task persists
1627    /// and will be cleaned up separately in `subtask_drop`
1628    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1629        match task {
1630            Some(task) => {
1631                log::trace!("delete host task {task:?}");
1632                self.concurrent_state_mut().delete(task)?;
1633            }
1634            None => {
1635                self.exit_call_not_concurrent();
1636            }
1637        }
1638        Ok(())
1639    }
1640
1641    /// Determine whether the specified instance may be entered from the host.
1642    ///
1643    /// We return `true` here only if all of the following hold:
1644    ///
1645    /// - The top-level instance is not already on the current task's call stack.
1646    /// - The instance is not in need of a post-return function call.
1647    /// - `self` has not been poisoned due to a trap.
1648    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1649        if self.trapped() {
1650            return Ok(false);
1651        }
1652        if !self.concurrency_support() {
1653            return Ok(true);
1654        }
1655        let state = self.concurrent_state_mut();
1656        let mut cur = state.current_thread;
1657        loop {
1658            match cur {
1659                CurrentThread::None => break Ok(true),
1660                CurrentThread::Guest(thread) => {
1661                    let task = state.get_mut(thread.task)?;
1662
1663                    // Note that we only compare top-level instance IDs here.
1664                    // The idea is that the host is not allowed to recursively
1665                    // enter a top-level instance even if the specific leaf
1666                    // instance is not on the stack. This the behavior defined
1667                    // in the spec, and it allows us to elide runtime checks in
1668                    // guest-to-guest adapters.
1669                    if task.instance.instance == instance.instance {
1670                        break Ok(false);
1671                    }
1672                    cur = match task.caller {
1673                        Caller::Host { caller, .. } => caller,
1674                        Caller::Guest { thread } => thread.into(),
1675                    };
1676                }
1677                CurrentThread::Host(id) => {
1678                    cur = state.get_mut(id)?.caller.into();
1679                }
1680            }
1681        }
1682    }
1683
1684    /// Helper function to retrieve the `InstanceState` for the
1685    /// specified instance.
1686    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1687        self.component_instance_mut(instance.instance)
1688            .instance_state(instance.index)
1689    }
1690
1691    /// Configure the currently running `thread`.
1692    ///
1693    /// This will save off any state necessary for the previous thread, if
1694    /// applicable, and then it'll additionally update state for `thread` if
1695    /// needed too.
1696    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1697        let thread = thread.into();
1698        let state = self.concurrent_state_mut();
1699        let old_thread = mem::replace(&mut state.current_thread, thread);
1700
1701        // First thing to do after swapping threads is updating the context
1702        // slots for this thread within the store. This restores the behavior of
1703        // `context.{get,set}`. This involves taking the old state out of the
1704        // store, saving it in the thread that's being swapped from, and doing
1705        // the inverse for the new thread. When debug assertions are enabled
1706        // this also leaves behind sentinel values to try to uncover bugs where
1707        // this may be forgotten.
1708        if let Some(old_thread) = old_thread.guest() {
1709            let old_context = self.vm_store_context().component_context;
1710            self.concurrent_state_mut()
1711                .get_mut(old_thread.thread)?
1712                .context = old_context;
1713        }
1714        if cfg!(debug_assertions) {
1715            self.vm_store_context_mut().component_context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1716        }
1717        if let Some(thread) = thread.guest() {
1718            let thread = self.concurrent_state_mut().get_mut(thread.thread)?;
1719            let context = thread.context;
1720            if cfg!(debug_assertions) {
1721                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1722            }
1723            self.vm_store_context_mut().component_context = context;
1724        }
1725
1726        // Each time we switch threads, we conservatively set `task_may_block`
1727        // to `false` for the component instance we're switching away from (if
1728        // any), meaning it will be `false` for any new thread created for that
1729        // instance unless explicitly set otherwise.
1730        //
1731        // Additionally if we're switching to a new thread, set its component
1732        // instance's `task_may_block` according to where it left off.
1733        let state = self.concurrent_state_mut();
1734        if let Some(old_thread) = old_thread.guest() {
1735            let instance = state.get_mut(old_thread.task)?.instance.instance;
1736            self.component_instance_mut(instance)
1737                .set_task_may_block(false)
1738        }
1739
1740        if thread.guest().is_some() {
1741            self.set_task_may_block()?;
1742        }
1743
1744        Ok(old_thread)
1745    }
1746
1747    /// Set the global variable representing whether the current task may block
1748    /// prior to entering Wasm code.
1749    fn set_task_may_block(&mut self) -> Result<()> {
1750        let state = self.concurrent_state_mut();
1751        let guest_thread = state.current_guest_thread()?;
1752        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1753        let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1754        self.component_instance_mut(instance)
1755            .set_task_may_block(may_block);
1756        Ok(())
1757    }
1758
1759    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1760        if !self.concurrency_support() {
1761            return Ok(());
1762        }
1763        let state = self.concurrent_state_mut();
1764        let task = state.current_guest_thread()?.task;
1765        let instance = state.get_mut(task)?.instance.instance;
1766        let task_may_block = self.component_instance(instance).get_task_may_block();
1767
1768        if task_may_block {
1769            Ok(())
1770        } else {
1771            Err(Trap::CannotBlockSyncTask.into())
1772        }
1773    }
1774
1775    /// Record that we're about to enter a (sub-)component instance which does
1776    /// not support more than one concurrent, stackful activation, meaning it
1777    /// cannot be entered again until the next call returns.
1778    fn enter_instance(&mut self, instance: RuntimeInstance) {
1779        log::trace!("enter {instance:?}");
1780        self.instance_state(instance)
1781            .concurrent_state()
1782            .do_not_enter = true;
1783    }
1784
1785    /// Record that we've exited a (sub-)component instance previously entered
1786    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1787    /// See the documentation for the latter for details.
1788    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1789        log::trace!("exit {instance:?}");
1790        self.instance_state(instance)
1791            .concurrent_state()
1792            .do_not_enter = false;
1793        self.partition_pending(instance)
1794    }
1795
1796    /// Iterate over `InstanceState::pending`, moving any ready items into the
1797    /// "high priority" work item queue.
1798    ///
1799    /// See `GuestCall::is_ready` for details.
1800    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1801        for (thread, kind) in
1802            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1803        {
1804            let call = GuestCall { thread, kind };
1805            if call.is_ready(self)? {
1806                self.concurrent_state_mut()
1807                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1808            } else {
1809                self.instance_state(instance)
1810                    .concurrent_state()
1811                    .pending
1812                    .insert(call.thread, call.kind);
1813            }
1814        }
1815
1816        Ok(())
1817    }
1818
1819    /// Implements the `backpressure.{inc,dec}` intrinsics.
1820    pub(crate) fn backpressure_modify(
1821        &mut self,
1822        caller_instance: RuntimeInstance,
1823        modify: impl FnOnce(u16) -> Option<u16>,
1824    ) -> Result<()> {
1825        let state = self.instance_state(caller_instance).concurrent_state();
1826        let old = state.backpressure;
1827        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1828        state.backpressure = new;
1829
1830        if old > 0 && new == 0 {
1831            // Backpressure was previously enabled and is now disabled; move any
1832            // newly-eligible guest calls to the "high priority" queue.
1833            self.partition_pending(caller_instance)?;
1834        }
1835
1836        Ok(())
1837    }
1838
1839    /// Resume the specified fiber, giving it exclusive access to the specified
1840    /// store.
1841    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1842        let old_thread = self.concurrent_state_mut().current_thread;
1843        log::trace!("resume_fiber: save current thread {old_thread:?}");
1844
1845        let fiber = fiber::resolve_or_release(self, fiber).await?;
1846
1847        self.set_thread(old_thread)?;
1848
1849        let state = self.concurrent_state_mut();
1850
1851        if let Some(ot) = old_thread.guest() {
1852            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1853        }
1854        log::trace!("resume_fiber: restore current thread {old_thread:?}");
1855
1856        if let Some(mut fiber) = fiber {
1857            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1858            // See the `SuspendReason` documentation for what each case means.
1859            let reason = match state.suspend_reason.take() {
1860                Some(r) => r,
1861                None => bail_bug!("suspend reason missing when resuming fiber"),
1862            };
1863            match reason {
1864                SuspendReason::NeedWork => {
1865                    if state.worker.is_none() {
1866                        state.worker = Some(fiber);
1867                    } else {
1868                        fiber.dispose(self);
1869                    }
1870                }
1871                SuspendReason::Yielding { thread, .. } => {
1872                    state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1873                    let instance = state.get_mut(thread.task)?.instance.index;
1874                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1875                }
1876                SuspendReason::ExplicitlySuspending { thread, .. } => {
1877                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1878                }
1879                SuspendReason::Waiting { set, thread, .. } => {
1880                    let old = state
1881                        .get_mut(set)?
1882                        .waiting
1883                        .insert(thread, WaitMode::Fiber(fiber));
1884                    assert!(old.is_none());
1885                }
1886            };
1887        } else {
1888            log::trace!("resume_fiber: fiber has exited");
1889        }
1890
1891        Ok(())
1892    }
1893
1894    /// Suspend the current fiber, storing the reason in
1895    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1896    /// it should be resumed.
1897    ///
1898    /// See the `SuspendReason` documentation for details.
1899    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1900        log::trace!("suspend fiber: {reason:?}");
1901
1902        // If we're yielding or waiting on behalf of a guest thread, we'll need to
1903        // pop the call context which manages resource borrows before suspending
1904        // and then push it again once we've resumed.
1905        let task = match &reason {
1906            SuspendReason::Yielding { thread, .. }
1907            | SuspendReason::Waiting { thread, .. }
1908            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1909            SuspendReason::NeedWork => None,
1910        };
1911
1912        let old_guest_thread = if task.is_some() {
1913            self.concurrent_state_mut().current_thread
1914        } else {
1915            CurrentThread::None
1916        };
1917
1918        // We should not have reached here unless either there's no current
1919        // task, or the current task is permitted to block.  In addition, we
1920        // special-case `thread.switch-to` and waiting for a subtask to go from
1921        // `starting` to `started`, both of which we consider non-blocking
1922        // operations despite requiring a suspend.
1923        debug_assert!(
1924            matches!(
1925                reason,
1926                SuspendReason::ExplicitlySuspending {
1927                    skip_may_block_check: true,
1928                    ..
1929                } | SuspendReason::Waiting {
1930                    skip_may_block_check: true,
1931                    ..
1932                } | SuspendReason::Yielding {
1933                    skip_may_block_check: true,
1934                    ..
1935                }
1936            ) || old_guest_thread
1937                .guest()
1938                .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1939                .transpose()?
1940                .unwrap_or(true)
1941        );
1942
1943        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1944        assert!(suspend_reason.is_none());
1945        *suspend_reason = Some(reason);
1946
1947        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1948
1949        if task.is_some() {
1950            self.set_thread(old_guest_thread)?;
1951        }
1952
1953        Ok(())
1954    }
1955
1956    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1957        let state = self.concurrent_state_mut();
1958        let caller = state.current_guest_thread()?;
1959        let old_set = waitable.common(state)?.set;
1960        let set = state.get_mut(caller.thread)?.sync_call_set;
1961        waitable.join(state, Some(set))?;
1962        self.suspend(SuspendReason::Waiting {
1963            set,
1964            thread: caller,
1965            skip_may_block_check: false,
1966        })?;
1967        let state = self.concurrent_state_mut();
1968        waitable.join(state, old_set)
1969    }
1970}
1971
1972impl Instance {
1973    /// Get the next pending event for the specified task and (optional)
1974    /// waitable set, along with the waitable handle if applicable.
1975    fn get_event(
1976        self,
1977        store: &mut StoreOpaque,
1978        guest_task: TableId<GuestTask>,
1979        set: Option<TableId<WaitableSet>>,
1980        cancellable: bool,
1981    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1982        let state = store.concurrent_state_mut();
1983
1984        let event = &mut state.get_mut(guest_task)?.event;
1985        if let Some(ev) = event
1986            && (cancellable || !matches!(ev, Event::Cancelled))
1987        {
1988            log::trace!("deliver event {ev:?} to {guest_task:?}");
1989            let ev = *ev;
1990            *event = None;
1991            return Ok(Some((ev, None)));
1992        }
1993
1994        let set = match set {
1995            Some(set) => set,
1996            None => return Ok(None),
1997        };
1998        let waitable = match state.get_mut(set)?.ready.pop_first() {
1999            Some(v) => v,
2000            None => return Ok(None),
2001        };
2002
2003        let common = waitable.common(state)?;
2004        let handle = match common.handle {
2005            Some(h) => h,
2006            None => bail_bug!("handle not set when delivering event"),
2007        };
2008        let event = match common.event.take() {
2009            Some(e) => e,
2010            None => bail_bug!("event not set when delivering event"),
2011        };
2012
2013        log::trace!(
2014            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2015        );
2016
2017        waitable.on_delivery(store, self, event)?;
2018
2019        Ok(Some((event, Some((waitable, handle)))))
2020    }
2021
2022    /// Handle the `CallbackCode` returned from an async-lifted export or its
2023    /// callback.
2024    ///
2025    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2026    /// using `handle_guest_call`.
2027    fn handle_callback_code(
2028        self,
2029        store: &mut StoreOpaque,
2030        guest_thread: QualifiedThreadId,
2031        runtime_instance: RuntimeComponentInstanceIndex,
2032        code: u32,
2033    ) -> Result<Option<GuestCall>> {
2034        let (code, set) = unpack_callback_code(code);
2035
2036        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2037
2038        let state = store.concurrent_state_mut();
2039
2040        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2041            let set = store
2042                .instance_state(RuntimeInstance {
2043                    instance: self.id().instance(),
2044                    index: runtime_instance,
2045                })
2046                .handle_table()
2047                .waitable_set_rep(handle)?;
2048
2049            Ok(TableId::<WaitableSet>::new(set))
2050        };
2051
2052        Ok(match code {
2053            callback_code::EXIT => {
2054                log::trace!("implicit thread {guest_thread:?} completed");
2055                self.cleanup_thread(store, guest_thread, runtime_instance)?;
2056                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2057                if task.threads.is_empty() && !task.returned_or_cancelled() {
2058                    bail!(Trap::NoAsyncResult);
2059                }
2060                if let Caller::Guest { .. } = task.caller {
2061                    task.exited = true;
2062                    task.callback = None;
2063                }
2064                if task.ready_to_delete() {
2065                    Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
2066                }
2067                None
2068            }
2069            callback_code::YIELD => {
2070                let task = state.get_mut(guest_thread.task)?;
2071                // If an `Event::Cancelled` is pending, we'll deliver that;
2072                // otherwise, we'll deliver `Event::None`.  Note that
2073                // `GuestTask::event` is only ever set to one of those two
2074                // `Event` variants.
2075                if let Some(event) = task.event {
2076                    assert!(matches!(event, Event::None | Event::Cancelled));
2077                } else {
2078                    task.event = Some(Event::None);
2079                }
2080                let call = GuestCall {
2081                    thread: guest_thread,
2082                    kind: GuestCallKind::DeliverEvent {
2083                        instance: self,
2084                        set: None,
2085                    },
2086                };
2087                if state.may_block(guest_thread.task)? {
2088                    // Push this thread onto the "low priority" queue so it runs
2089                    // after any other threads have had a chance to run.
2090                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2091                    None
2092                } else {
2093                    // Yielding in a non-blocking context is defined as a no-op
2094                    // according to the spec, so we must run this thread
2095                    // immediately without allowing any others to run.
2096                    Some(call)
2097                }
2098            }
2099            callback_code::WAIT => {
2100                // The task may only return `WAIT` if it was created for a call
2101                // to an async export).  Otherwise, we'll trap.
2102                state.check_blocking_for(guest_thread.task)?;
2103
2104                let set = get_set(store, set)?;
2105                let state = store.concurrent_state_mut();
2106
2107                if state.get_mut(guest_thread.task)?.event.is_some()
2108                    || !state.get_mut(set)?.ready.is_empty()
2109                {
2110                    // An event is immediately available; deliver it ASAP.
2111                    state.push_high_priority(WorkItem::GuestCall(
2112                        runtime_instance,
2113                        GuestCall {
2114                            thread: guest_thread,
2115                            kind: GuestCallKind::DeliverEvent {
2116                                instance: self,
2117                                set: Some(set),
2118                            },
2119                        },
2120                    ));
2121                } else {
2122                    // No event is immediately available.
2123                    //
2124                    // We're waiting, so register to be woken up when an event
2125                    // is published for this waitable set.
2126                    //
2127                    // Here we also set `GuestTask::wake_on_cancel` which allows
2128                    // `subtask.cancel` to interrupt the wait.
2129                    let old = state
2130                        .get_mut(guest_thread.thread)?
2131                        .wake_on_cancel
2132                        .replace(set);
2133                    if !old.is_none() {
2134                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2135                    }
2136                    let old = state
2137                        .get_mut(set)?
2138                        .waiting
2139                        .insert(guest_thread, WaitMode::Callback(self));
2140                    if !old.is_none() {
2141                        bail_bug!("set's waiting set already had this thread registered");
2142                    }
2143                }
2144                None
2145            }
2146            _ => bail!(Trap::UnsupportedCallbackCode),
2147        })
2148    }
2149
2150    fn cleanup_thread(
2151        self,
2152        store: &mut StoreOpaque,
2153        guest_thread: QualifiedThreadId,
2154        runtime_instance: RuntimeComponentInstanceIndex,
2155    ) -> Result<()> {
2156        let state = store.concurrent_state_mut();
2157        let thread_data = state.get_mut(guest_thread.thread)?;
2158        let sync_call_set = thread_data.sync_call_set;
2159        if let Some(guest_id) = thread_data.instance_rep {
2160            store
2161                .instance_state(RuntimeInstance {
2162                    instance: self.id().instance(),
2163                    index: runtime_instance,
2164                })
2165                .thread_handle_table()
2166                .guest_thread_remove(guest_id)?;
2167        }
2168        let state = store.concurrent_state_mut();
2169
2170        // Clean up any pending subtasks in the sync_call_set
2171        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2172            if let Some(Event::Subtask {
2173                status: Status::Returned | Status::ReturnCancelled,
2174            }) = waitable.common(state)?.event
2175            {
2176                waitable.delete_from(state)?;
2177            }
2178        }
2179
2180        state.delete(guest_thread.thread)?;
2181        state.delete(sync_call_set)?;
2182        let task = state.get_mut(guest_thread.task)?;
2183        task.threads.remove(&guest_thread.thread);
2184        Ok(())
2185    }
2186
2187    /// Add the specified guest call to the "high priority" work item queue, to
2188    /// be started as soon as backpressure and/or reentrance rules allow.
2189    ///
2190    /// SAFETY: The raw pointer arguments must be valid references to guest
2191    /// functions (with the appropriate signatures) when the closures queued by
2192    /// this function are called.
2193    unsafe fn queue_call<T: 'static>(
2194        self,
2195        mut store: StoreContextMut<T>,
2196        guest_thread: QualifiedThreadId,
2197        callee: SendSyncPtr<VMFuncRef>,
2198        param_count: usize,
2199        result_count: usize,
2200        async_: bool,
2201        callback: Option<SendSyncPtr<VMFuncRef>>,
2202        post_return: Option<SendSyncPtr<VMFuncRef>>,
2203    ) -> Result<()> {
2204        /// Return a closure which will call the specified function in the scope
2205        /// of the specified task.
2206        ///
2207        /// This will use `GuestTask::lower_params` to lower the parameters, but
2208        /// will not lift the result; instead, it returns a
2209        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2210        /// any, may be lifted.  Note that an async-lifted export will have
2211        /// returned its result using the `task.return` intrinsic (or not
2212        /// returned a result at all, in the case of `task.cancel`), in which
2213        /// case the "result" of this call will either be a callback code or
2214        /// nothing.
2215        ///
2216        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2217        /// the returned closure is called.
2218        unsafe fn make_call<T: 'static>(
2219            store: StoreContextMut<T>,
2220            guest_thread: QualifiedThreadId,
2221            callee: SendSyncPtr<VMFuncRef>,
2222            param_count: usize,
2223            result_count: usize,
2224        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2225        + Send
2226        + Sync
2227        + 'static
2228        + use<T> {
2229            let token = StoreToken::new(store);
2230            move |store: &mut dyn VMStore| {
2231                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2232
2233                store
2234                    .concurrent_state_mut()
2235                    .get_mut(guest_thread.thread)?
2236                    .state = GuestThreadState::Running;
2237                let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2238                let lower = match task.lower_params.take() {
2239                    Some(l) => l,
2240                    None => bail_bug!("lower_params missing"),
2241                };
2242
2243                lower(store, &mut storage[..param_count])?;
2244
2245                let mut store = token.as_context_mut(store);
2246
2247                // SAFETY: Per the contract documented in `make_call's`
2248                // documentation, `callee` must be a valid pointer.
2249                unsafe {
2250                    crate::Func::call_unchecked_raw(
2251                        &mut store,
2252                        callee.as_non_null(),
2253                        NonNull::new(
2254                            &mut storage[..param_count.max(result_count)]
2255                                as *mut [MaybeUninit<ValRaw>] as _,
2256                        )
2257                        .unwrap(),
2258                    )?;
2259                }
2260
2261                Ok(storage)
2262            }
2263        }
2264
2265        // SAFETY: Per the contract described in this function documentation,
2266        // the `callee` pointer which `call` closes over must be valid when
2267        // called by the closure we queue below.
2268        let call = unsafe {
2269            make_call(
2270                store.as_context_mut(),
2271                guest_thread,
2272                callee,
2273                param_count,
2274                result_count,
2275            )
2276        };
2277
2278        let callee_instance = store
2279            .0
2280            .concurrent_state_mut()
2281            .get_mut(guest_thread.task)?
2282            .instance;
2283
2284        let fun = if callback.is_some() {
2285            assert!(async_);
2286
2287            Box::new(move |store: &mut dyn VMStore| {
2288                self.add_guest_thread_to_instance_table(
2289                    guest_thread.thread,
2290                    store,
2291                    callee_instance.index,
2292                )?;
2293                let old_thread = store.set_thread(guest_thread)?;
2294                log::trace!(
2295                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2296                );
2297
2298                store.enter_instance(callee_instance);
2299
2300                // SAFETY: See the documentation for `make_call` to review the
2301                // contract we must uphold for `call` here.
2302                //
2303                // Per the contract described in the `queue_call`
2304                // documentation, the `callee` pointer which `call` closes
2305                // over must be valid.
2306                let storage = call(store)?;
2307
2308                store.exit_instance(callee_instance)?;
2309
2310                store.set_thread(old_thread)?;
2311                let state = store.concurrent_state_mut();
2312                if let Some(t) = old_thread.guest() {
2313                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2314                }
2315                log::trace!("stackless call: restored {old_thread:?} as current thread");
2316
2317                // SAFETY: `wasmparser` will have validated that the callback
2318                // function returns a `i32` result.
2319                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2320
2321                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2322            })
2323                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2324        } else {
2325            let token = StoreToken::new(store.as_context_mut());
2326            Box::new(move |store: &mut dyn VMStore| {
2327                self.add_guest_thread_to_instance_table(
2328                    guest_thread.thread,
2329                    store,
2330                    callee_instance.index,
2331                )?;
2332                let old_thread = store.set_thread(guest_thread)?;
2333                log::trace!(
2334                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2335                );
2336                let flags = self.id().get(store).instance_flags(callee_instance.index);
2337
2338                // Unless this is a callback-less (i.e. stackful)
2339                // async-lifted export, we need to record that the instance
2340                // cannot be entered until the call returns.
2341                if !async_ {
2342                    store.enter_instance(callee_instance);
2343                }
2344
2345                // SAFETY: See the documentation for `make_call` to review the
2346                // contract we must uphold for `call` here.
2347                //
2348                // Per the contract described in the `queue_call`
2349                // documentation, the `callee` pointer which `call` closes
2350                // over must be valid.
2351                let storage = call(store)?;
2352
2353                if async_ {
2354                    let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2355                    if task.threads.len() == 1 && !task.returned_or_cancelled() {
2356                        bail!(Trap::NoAsyncResult);
2357                    }
2358                } else {
2359                    // This is a sync-lifted export, so now is when we lift the
2360                    // result, optionally call the post-return function, if any,
2361                    // and finally notify any current or future waiters that the
2362                    // subtask has returned.
2363
2364                    let lift = {
2365                        store.exit_instance(callee_instance)?;
2366
2367                        let state = store.concurrent_state_mut();
2368                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2369                            bail_bug!("task has already produced a result");
2370                        }
2371
2372                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2373                            Some(lift) => lift,
2374                            None => bail_bug!("lift_result field is missing"),
2375                        }
2376                    };
2377
2378                    // SAFETY: `result_count` represents the number of core Wasm
2379                    // results returned, per `wasmparser`.
2380                    let result = (lift.lift)(store, unsafe {
2381                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2382                            &storage[..result_count],
2383                        )
2384                    })?;
2385
2386                    let post_return_arg = match result_count {
2387                        0 => ValRaw::i32(0),
2388                        // SAFETY: `result_count` represents the number of
2389                        // core Wasm results returned, per `wasmparser`.
2390                        1 => unsafe { storage[0].assume_init() },
2391                        _ => unreachable!(),
2392                    };
2393
2394                    unsafe {
2395                        call_post_return(
2396                            token.as_context_mut(store),
2397                            post_return.map(|v| v.as_non_null()),
2398                            post_return_arg,
2399                            flags,
2400                        )?;
2401                    }
2402
2403                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2404                }
2405
2406                store.set_thread(old_thread)?;
2407
2408                // This is a callback-less call, so the implicit thread has now completed
2409                self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2410
2411                let state = store.concurrent_state_mut();
2412                let task = state.get_mut(guest_thread.task)?;
2413
2414                match &task.caller {
2415                    Caller::Host { .. } => {
2416                        if task.ready_to_delete() {
2417                            Waitable::Guest(guest_thread.task).delete_from(state)?;
2418                        }
2419                    }
2420                    Caller::Guest { .. } => {
2421                        task.exited = true;
2422                    }
2423                }
2424
2425                Ok(None)
2426            })
2427        };
2428
2429        store
2430            .0
2431            .concurrent_state_mut()
2432            .push_high_priority(WorkItem::GuestCall(
2433                callee_instance.index,
2434                GuestCall {
2435                    thread: guest_thread,
2436                    kind: GuestCallKind::StartImplicit(fun),
2437                },
2438            ));
2439
2440        Ok(())
2441    }
2442
2443    /// Prepare (but do not start) a guest->guest call.
2444    ///
2445    /// This is called from fused adapter code generated in
2446    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2447    /// are synthesized Wasm functions which move the parameters from the caller
2448    /// to the callee and the result from the callee to the caller,
2449    /// respectively.  The adapter will call `Self::start_call` immediately
2450    /// after calling this function.
2451    ///
2452    /// SAFETY: All the pointer arguments must be valid pointers to guest
2453    /// entities (and with the expected signatures for the function references
2454    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2455    unsafe fn prepare_call<T: 'static>(
2456        self,
2457        mut store: StoreContextMut<T>,
2458        start: NonNull<VMFuncRef>,
2459        return_: NonNull<VMFuncRef>,
2460        caller_instance: RuntimeComponentInstanceIndex,
2461        callee_instance: RuntimeComponentInstanceIndex,
2462        task_return_type: TypeTupleIndex,
2463        callee_async: bool,
2464        memory: *mut VMMemoryDefinition,
2465        string_encoding: StringEncoding,
2466        caller_info: CallerInfo,
2467    ) -> Result<()> {
2468        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2469            // A task may only call an async-typed function via a sync lower if
2470            // it was created by a call to an async export.  Otherwise, we'll
2471            // trap.
2472            store.0.check_blocking()?;
2473        }
2474
2475        enum ResultInfo {
2476            Heap { results: u32 },
2477            Stack { result_count: u32 },
2478        }
2479
2480        let result_info = match &caller_info {
2481            CallerInfo::Async {
2482                has_result: true,
2483                params,
2484            } => ResultInfo::Heap {
2485                results: match params.last() {
2486                    Some(r) => r.get_u32(),
2487                    None => bail_bug!("retptr missing"),
2488                },
2489            },
2490            CallerInfo::Async {
2491                has_result: false, ..
2492            } => ResultInfo::Stack { result_count: 0 },
2493            CallerInfo::Sync {
2494                result_count,
2495                params,
2496            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2497                results: match params.last() {
2498                    Some(r) => r.get_u32(),
2499                    None => bail_bug!("arg ptr missing"),
2500                },
2501            },
2502            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2503                result_count: *result_count,
2504            },
2505        };
2506
2507        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2508
2509        // Create a new guest task for the call, closing over the `start` and
2510        // `return_` functions to lift the parameters and lower the result,
2511        // respectively.
2512        let start = SendSyncPtr::new(start);
2513        let return_ = SendSyncPtr::new(return_);
2514        let token = StoreToken::new(store.as_context_mut());
2515        let state = store.0.concurrent_state_mut();
2516        let old_thread = state.current_guest_thread()?;
2517
2518        debug_assert_eq!(
2519            state.get_mut(old_thread.task)?.instance,
2520            RuntimeInstance {
2521                instance: self.id().instance(),
2522                index: caller_instance,
2523            }
2524        );
2525
2526        let new_task = GuestTask::new(
2527            Box::new(move |store, dst| {
2528                let mut store = token.as_context_mut(store);
2529                assert!(dst.len() <= MAX_FLAT_PARAMS);
2530                // The `+ 1` here accounts for the return pointer, if any:
2531                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2532                let count = match caller_info {
2533                    // Async callers, if they have a result, use the last
2534                    // parameter as a return pointer so chop that off if
2535                    // relevant here.
2536                    CallerInfo::Async { params, has_result } => {
2537                        let params = &params[..params.len() - usize::from(has_result)];
2538                        for (param, src) in params.iter().zip(&mut src) {
2539                            src.write(*param);
2540                        }
2541                        params.len()
2542                    }
2543
2544                    // Sync callers forward everything directly.
2545                    CallerInfo::Sync { params, .. } => {
2546                        for (param, src) in params.iter().zip(&mut src) {
2547                            src.write(*param);
2548                        }
2549                        params.len()
2550                    }
2551                };
2552                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2553                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2554                // how it was constructed (see
2555                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2556                // for details) we know it takes count parameters and returns
2557                // `dst.len()` results.
2558                unsafe {
2559                    crate::Func::call_unchecked_raw(
2560                        &mut store,
2561                        start.as_non_null(),
2562                        NonNull::new(
2563                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2564                        )
2565                        .unwrap(),
2566                    )?;
2567                }
2568                dst.copy_from_slice(&src[..dst.len()]);
2569                let state = store.0.concurrent_state_mut();
2570                Waitable::Guest(state.current_guest_thread()?.task).set_event(
2571                    state,
2572                    Some(Event::Subtask {
2573                        status: Status::Started,
2574                    }),
2575                )?;
2576                Ok(())
2577            }),
2578            LiftResult {
2579                lift: Box::new(move |store, src| {
2580                    // SAFETY: See comment in closure passed as `lower_params`
2581                    // parameter above.
2582                    let mut store = token.as_context_mut(store);
2583                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2584                    if let ResultInfo::Heap { results } = &result_info {
2585                        my_src.push(ValRaw::u32(*results));
2586                    }
2587
2588                    // Execute the `return_` hook, generated by Wasmtime's FACT
2589                    // compiler, in the context of the old thread. The old
2590                    // thread, this thread's caller, may have `realloc`
2591                    // callbacks invoked for example and those need the correct
2592                    // context set for the current thread.
2593                    let prev = store.0.set_thread(old_thread)?;
2594
2595                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2596                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2597                    // on how it was constructed (see
2598                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2599                    // for details) we know it takes `src.len()` parameters and
2600                    // returns up to 1 result.
2601                    unsafe {
2602                        crate::Func::call_unchecked_raw(
2603                            &mut store,
2604                            return_.as_non_null(),
2605                            my_src.as_mut_slice().into(),
2606                        )?;
2607                    }
2608
2609                    // Restore the previous current thread after the
2610                    // lifting/lowering has returned.
2611                    store.0.set_thread(prev)?;
2612
2613                    let state = store.0.concurrent_state_mut();
2614                    let thread = state.current_guest_thread()?;
2615                    if sync_caller {
2616                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2617                            if let ResultInfo::Stack { result_count } = &result_info {
2618                                match result_count {
2619                                    0 => None,
2620                                    1 => Some(my_src[0]),
2621                                    _ => unreachable!(),
2622                                }
2623                            } else {
2624                                None
2625                            },
2626                        );
2627                    }
2628                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2629                }),
2630                ty: task_return_type,
2631                memory: NonNull::new(memory).map(SendSyncPtr::new),
2632                string_encoding,
2633            },
2634            Caller::Guest { thread: old_thread },
2635            None,
2636            RuntimeInstance {
2637                instance: self.id().instance(),
2638                index: callee_instance,
2639            },
2640            callee_async,
2641        )?;
2642
2643        let guest_task = state.push(new_task)?;
2644        let new_thread = GuestThread::new_implicit(state, guest_task)?;
2645        let guest_thread = state.push(new_thread)?;
2646        state.get_mut(guest_task)?.threads.insert(guest_thread);
2647
2648        // Make the new thread the current one so that `Self::start_call` knows
2649        // which one to start.
2650        store.0.set_thread(QualifiedThreadId {
2651            task: guest_task,
2652            thread: guest_thread,
2653        })?;
2654        log::trace!(
2655            "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2656        );
2657
2658        Ok(())
2659    }
2660
2661    /// Call the specified callback function for an async-lifted export.
2662    ///
2663    /// SAFETY: `function` must be a valid reference to a guest function of the
2664    /// correct signature for a callback.
2665    unsafe fn call_callback<T>(
2666        self,
2667        mut store: StoreContextMut<T>,
2668        function: SendSyncPtr<VMFuncRef>,
2669        event: Event,
2670        handle: u32,
2671    ) -> Result<u32> {
2672        let (ordinal, result) = event.parts();
2673        let params = &mut [
2674            ValRaw::u32(ordinal),
2675            ValRaw::u32(handle),
2676            ValRaw::u32(result),
2677        ];
2678        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2679        // `wasmtime-cranelift`-generated fused adapter code or
2680        // `component::Options`.  Per `wasmparser` callback signature
2681        // validation, we know it takes three parameters and returns one.
2682        unsafe {
2683            crate::Func::call_unchecked_raw(
2684                &mut store,
2685                function.as_non_null(),
2686                params.as_mut_slice().into(),
2687            )?;
2688        }
2689        Ok(params[0].get_u32())
2690    }
2691
2692    /// Start a guest->guest call previously prepared using
2693    /// `Self::prepare_call`.
2694    ///
2695    /// This is called from fused adapter code generated in
2696    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2697    /// this function immediately after calling `Self::prepare_call`.
2698    ///
2699    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2700    /// functions with the appropriate signatures for the current guest task.
2701    /// If this is a call to an async-lowered import, the actual call may be
2702    /// deferred and run after this function returns, in which case the pointer
2703    /// arguments must also be valid when the call happens.
2704    unsafe fn start_call<T: 'static>(
2705        self,
2706        mut store: StoreContextMut<T>,
2707        callback: *mut VMFuncRef,
2708        post_return: *mut VMFuncRef,
2709        callee: NonNull<VMFuncRef>,
2710        param_count: u32,
2711        result_count: u32,
2712        flags: u32,
2713        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2714    ) -> Result<u32> {
2715        let token = StoreToken::new(store.as_context_mut());
2716        let async_caller = storage.is_none();
2717        let state = store.0.concurrent_state_mut();
2718        let guest_thread = state.current_guest_thread()?;
2719        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2720        let callee = SendSyncPtr::new(callee);
2721        let param_count = usize::try_from(param_count)?;
2722        assert!(param_count <= MAX_FLAT_PARAMS);
2723        let result_count = usize::try_from(result_count)?;
2724        assert!(result_count <= MAX_FLAT_RESULTS);
2725
2726        let task = state.get_mut(guest_thread.task)?;
2727        if let Some(callback) = NonNull::new(callback) {
2728            // We're calling an async-lifted export with a callback, so store
2729            // the callback and related context as part of the task so we can
2730            // call it later when needed.
2731            let callback = SendSyncPtr::new(callback);
2732            task.callback = Some(Box::new(move |store, event, handle| {
2733                let store = token.as_context_mut(store);
2734                unsafe { self.call_callback::<T>(store, callback, event, handle) }
2735            }));
2736        }
2737
2738        let Caller::Guest { thread: caller } = &task.caller else {
2739            // As of this writing, `start_call` is only used for guest->guest
2740            // calls.
2741            bail_bug!("start_call unexpectedly invoked for host->guest call");
2742        };
2743        let caller = *caller;
2744        let caller_instance = state.get_mut(caller.task)?.instance;
2745
2746        // Queue the call as a "high priority" work item.
2747        unsafe {
2748            self.queue_call(
2749                store.as_context_mut(),
2750                guest_thread,
2751                callee,
2752                param_count,
2753                result_count,
2754                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2755                NonNull::new(callback).map(SendSyncPtr::new),
2756                NonNull::new(post_return).map(SendSyncPtr::new),
2757            )?;
2758        }
2759
2760        let state = store.0.concurrent_state_mut();
2761
2762        // Use the caller's `GuestThread::sync_call_set` to register interest in
2763        // the subtask...
2764        let guest_waitable = Waitable::Guest(guest_thread.task);
2765        let old_set = guest_waitable.common(state)?.set;
2766        let set = state.get_mut(caller.thread)?.sync_call_set;
2767        guest_waitable.join(state, Some(set))?;
2768
2769        store.0.set_thread(CurrentThread::None)?;
2770
2771        // ... and suspend this fiber temporarily while we wait for it to start.
2772        //
2773        // Note that we _could_ call the callee directly using the current fiber
2774        // rather than suspend this one, but that would make reasoning about the
2775        // event loop more complicated and is probably only worth doing if
2776        // there's a measurable performance benefit.  In addition, it would mean
2777        // blocking the caller if the callee calls a blocking sync-lowered
2778        // import, and as of this writing the spec says we must not do that.
2779        //
2780        // Alternatively, the fused adapter code could be modified to call the
2781        // callee directly without calling a host-provided intrinsic at all (in
2782        // which case it would need to do its own, inline backpressure checks,
2783        // etc.).  Again, we'd want to see a measurable performance benefit
2784        // before committing to such an optimization.  And again, we'd need to
2785        // update the spec to allow that.
2786        let (status, waitable) = loop {
2787            store.0.suspend(SuspendReason::Waiting {
2788                set,
2789                thread: caller,
2790                // Normally, `StoreOpaque::suspend` would assert it's being
2791                // called from a context where blocking is allowed.  However, if
2792                // `async_caller` is `true`, we'll only "block" long enough for
2793                // the callee to start, i.e. we won't repeat this loop, so we
2794                // tell `suspend` it's okay even if we're not allowed to block.
2795                // Alternatively, if the callee is not an async function, then
2796                // we know it won't block anyway.
2797                skip_may_block_check: async_caller || !callee_async,
2798            })?;
2799
2800            let state = store.0.concurrent_state_mut();
2801
2802            log::trace!("taking event for {:?}", guest_thread.task);
2803            let event = guest_waitable.take_event(state)?;
2804            let Some(Event::Subtask { status }) = event else {
2805                bail_bug!("subtasks should only get subtask events, got {event:?}")
2806            };
2807
2808            log::trace!("status {status:?} for {:?}", guest_thread.task);
2809
2810            if status == Status::Returned {
2811                // It returned, so we can stop waiting.
2812                break (status, None);
2813            } else if async_caller {
2814                // It hasn't returned yet, but the caller is calling via an
2815                // async-lowered import, so we generate a handle for the task
2816                // waitable and return the status.
2817                let handle = store
2818                    .0
2819                    .instance_state(caller_instance)
2820                    .handle_table()
2821                    .subtask_insert_guest(guest_thread.task.rep())?;
2822                store
2823                    .0
2824                    .concurrent_state_mut()
2825                    .get_mut(guest_thread.task)?
2826                    .common
2827                    .handle = Some(handle);
2828                break (status, Some(handle));
2829            } else {
2830                // The callee hasn't returned yet, and the caller is calling via
2831                // a sync-lowered import, so we loop and keep waiting until the
2832                // callee returns.
2833            }
2834        };
2835
2836        guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2837
2838        // Reset the current thread to point to the caller as it resumes control.
2839        store.0.set_thread(caller)?;
2840        store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2841        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2842
2843        if let Some(storage) = storage {
2844            // The caller used a sync-lowered import to call an async-lifted
2845            // export, in which case the result, if any, has been stashed in
2846            // `GuestTask::sync_result`.
2847            let state = store.0.concurrent_state_mut();
2848            let task = state.get_mut(guest_thread.task)?;
2849            if let Some(result) = task.sync_result.take()? {
2850                if let Some(result) = result {
2851                    storage[0] = MaybeUninit::new(result);
2852                }
2853
2854                if task.exited && task.ready_to_delete() {
2855                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2856                }
2857            }
2858        }
2859
2860        Ok(status.pack(waitable))
2861    }
2862
2863    /// Poll the specified future once on behalf of a guest->host call using an
2864    /// async-lowered import.
2865    ///
2866    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2867    /// `Pending`, add it to the set of futures to be polled as part of this
2868    /// instance's event loop until it completes, and then return
2869    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2870    ///
2871    /// Whether the future returns `Ready` immediately or later, the `lower`
2872    /// function will be used to lower the result, if any, into the guest caller's
2873    /// stack and linear memory. The `lower` function is invoked with `None` if
2874    /// the future is cancelled.
2875    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2876        self,
2877        mut store: StoreContextMut<'_, T>,
2878        future: impl Future<Output = Result<R>> + Send + 'static,
2879        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2880    ) -> Result<Option<u32>> {
2881        let token = StoreToken::new(store.as_context_mut());
2882        let state = store.0.concurrent_state_mut();
2883        let task = state.current_host_thread()?;
2884
2885        // Create an abortable future which hooks calls to poll and manages call
2886        // context state for the future.
2887        let (join_handle, future) = JoinHandle::run(future);
2888        {
2889            let state = &mut state.get_mut(task)?.state;
2890            assert!(matches!(state, HostTaskState::CalleeStarted));
2891            *state = HostTaskState::CalleeRunning(join_handle);
2892        }
2893
2894        let mut future = Box::pin(future);
2895
2896        // Finally, poll the future.  We can use a dummy `Waker` here because
2897        // we'll add the future to `ConcurrentState::futures` and poll it
2898        // automatically from the event loop if it doesn't complete immediately
2899        // here.
2900        let poll = tls::set(store.0, || {
2901            future
2902                .as_mut()
2903                .poll(&mut Context::from_waker(&Waker::noop()))
2904        });
2905
2906        match poll {
2907            // It finished immediately; lower the result and delete the task.
2908            Poll::Ready(result) => {
2909                let result = result.transpose()?;
2910                lower(store.as_context_mut(), result)?;
2911                return Ok(None);
2912            }
2913
2914            // Future isn't ready yet, so fall through.
2915            Poll::Pending => {}
2916        }
2917
2918        // It hasn't finished yet; add the future to
2919        // `ConcurrentState::futures` so it will be polled by the event
2920        // loop and allocate a waitable handle to return to the guest.
2921
2922        // Wrap the future in a closure responsible for lowering the result into
2923        // the guest's stack and memory, as well as notifying any waiters that
2924        // the task returned.
2925        let future = Box::pin(async move {
2926            let result = match future.await {
2927                Some(result) => Some(result?),
2928                None => None,
2929            };
2930            let on_complete = move |store: &mut dyn VMStore| {
2931                // Restore the `current_thread` to be the host so `lower` knows
2932                // how to manipulate borrows and knows which scope of borrows
2933                // to check.
2934                let mut store = token.as_context_mut(store);
2935                let old = store.0.set_thread(task)?;
2936
2937                let status = if result.is_some() {
2938                    Status::Returned
2939                } else {
2940                    Status::ReturnCancelled
2941                };
2942
2943                lower(store.as_context_mut(), result)?;
2944                let state = store.0.concurrent_state_mut();
2945                match &mut state.get_mut(task)?.state {
2946                    // The task is already flagged as finished because it was
2947                    // cancelled. No need to transition further.
2948                    HostTaskState::CalleeDone { .. } => {}
2949
2950                    // Otherwise transition this task to the done state.
2951                    other => *other = HostTaskState::CalleeDone { cancelled: false },
2952                }
2953                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2954
2955                store.0.set_thread(old)?;
2956                Ok(())
2957            };
2958
2959            // Here we schedule a task to run on a worker fiber to do the
2960            // lowering since it may involve a call to the guest's realloc
2961            // function. This is necessary because calling the guest while
2962            // there are host embedder frames on the stack is unsound.
2963            tls::get(move |store| {
2964                store
2965                    .concurrent_state_mut()
2966                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2967                        on_complete,
2968                    ))));
2969                Ok(())
2970            })
2971        });
2972
2973        // Make this task visible to the guest and then record what it
2974        // was made visible as.
2975        let state = store.0.concurrent_state_mut();
2976        state.push_future(future);
2977        let caller = state.get_mut(task)?.caller;
2978        let instance = state.get_mut(caller.task)?.instance;
2979        let handle = store
2980            .0
2981            .instance_state(instance)
2982            .handle_table()
2983            .subtask_insert_host(task.rep())?;
2984        store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2985        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2986
2987        // Restore the currently running thread to this host task's
2988        // caller. Note that the host task isn't deallocated as it's
2989        // within the store and will get deallocated later.
2990        store.0.set_thread(caller)?;
2991        Ok(Some(handle))
2992    }
2993
2994    /// Implements the `task.return` intrinsic, lifting the result for the
2995    /// current guest task.
2996    pub(crate) fn task_return(
2997        self,
2998        store: &mut dyn VMStore,
2999        ty: TypeTupleIndex,
3000        options: OptionsIndex,
3001        storage: &[ValRaw],
3002    ) -> Result<()> {
3003        let state = store.concurrent_state_mut();
3004        let guest_thread = state.current_guest_thread()?;
3005        let lift = state
3006            .get_mut(guest_thread.task)?
3007            .lift_result
3008            .take()
3009            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3010        if !state.get_mut(guest_thread.task)?.result.is_none() {
3011            bail_bug!("task result unexpectedly already set");
3012        }
3013
3014        let CanonicalOptions {
3015            string_encoding,
3016            data_model,
3017            ..
3018        } = &self.id().get(store).component().env_component().options[options];
3019
3020        let invalid = ty != lift.ty
3021            || string_encoding != &lift.string_encoding
3022            || match data_model {
3023                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3024                    Some(memory) => {
3025                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3026                        let actual = self.id().get(store).runtime_memory(memory);
3027                        expected != actual.as_ptr()
3028                    }
3029                    // Memory not specified, meaning it didn't need to be
3030                    // specified per validation, so not invalid.
3031                    None => false,
3032                },
3033                // Always invalid as this isn't supported.
3034                CanonicalOptionsDataModel::Gc { .. } => true,
3035            };
3036
3037        if invalid {
3038            bail!(Trap::TaskReturnInvalid);
3039        }
3040
3041        log::trace!("task.return for {guest_thread:?}");
3042
3043        let result = (lift.lift)(store, storage)?;
3044        self.task_complete(store, guest_thread.task, result, Status::Returned)
3045    }
3046
3047    /// Implements the `task.cancel` intrinsic.
3048    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3049        let state = store.concurrent_state_mut();
3050        let guest_thread = state.current_guest_thread()?;
3051        let task = state.get_mut(guest_thread.task)?;
3052        if !task.cancel_sent {
3053            bail!(Trap::TaskCancelNotCancelled);
3054        }
3055        _ = task
3056            .lift_result
3057            .take()
3058            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3059
3060        if !task.result.is_none() {
3061            bail_bug!("task result should not bet set yet");
3062        }
3063
3064        log::trace!("task.cancel for {guest_thread:?}");
3065
3066        self.task_complete(
3067            store,
3068            guest_thread.task,
3069            Box::new(DummyResult),
3070            Status::ReturnCancelled,
3071        )
3072    }
3073
3074    /// Complete the specified guest task (i.e. indicate that it has either
3075    /// returned a (possibly empty) result or cancelled itself).
3076    ///
3077    /// This will return any resource borrows and notify any current or future
3078    /// waiters that the task has completed.
3079    fn task_complete(
3080        self,
3081        store: &mut StoreOpaque,
3082        guest_task: TableId<GuestTask>,
3083        result: Box<dyn Any + Send + Sync>,
3084        status: Status,
3085    ) -> Result<()> {
3086        store
3087            .component_resource_tables(Some(self))
3088            .validate_scope_exit()?;
3089
3090        let state = store.concurrent_state_mut();
3091        let task = state.get_mut(guest_task)?;
3092
3093        if let Caller::Host { tx, .. } = &mut task.caller {
3094            if let Some(tx) = tx.take() {
3095                _ = tx.send(result);
3096            }
3097        } else {
3098            task.result = Some(result);
3099            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3100        }
3101
3102        Ok(())
3103    }
3104
3105    /// Implements the `waitable-set.new` intrinsic.
3106    pub(crate) fn waitable_set_new(
3107        self,
3108        store: &mut StoreOpaque,
3109        caller_instance: RuntimeComponentInstanceIndex,
3110    ) -> Result<u32> {
3111        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3112        let handle = store
3113            .instance_state(RuntimeInstance {
3114                instance: self.id().instance(),
3115                index: caller_instance,
3116            })
3117            .handle_table()
3118            .waitable_set_insert(set.rep())?;
3119        log::trace!("new waitable set {set:?} (handle {handle})");
3120        Ok(handle)
3121    }
3122
3123    /// Implements the `waitable-set.drop` intrinsic.
3124    pub(crate) fn waitable_set_drop(
3125        self,
3126        store: &mut StoreOpaque,
3127        caller_instance: RuntimeComponentInstanceIndex,
3128        set: u32,
3129    ) -> Result<()> {
3130        let rep = store
3131            .instance_state(RuntimeInstance {
3132                instance: self.id().instance(),
3133                index: caller_instance,
3134            })
3135            .handle_table()
3136            .waitable_set_remove(set)?;
3137
3138        log::trace!("drop waitable set {rep} (handle {set})");
3139
3140        // Note that we're careful to check for waiters _before_ deleting the
3141        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3142        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3143        if !store
3144            .concurrent_state_mut()
3145            .get_mut(TableId::<WaitableSet>::new(rep))?
3146            .waiting
3147            .is_empty()
3148        {
3149            bail!(Trap::WaitableSetDropHasWaiters);
3150        }
3151
3152        store
3153            .concurrent_state_mut()
3154            .delete(TableId::<WaitableSet>::new(rep))?;
3155
3156        Ok(())
3157    }
3158
3159    /// Implements the `waitable.join` intrinsic.
3160    pub(crate) fn waitable_join(
3161        self,
3162        store: &mut StoreOpaque,
3163        caller_instance: RuntimeComponentInstanceIndex,
3164        waitable_handle: u32,
3165        set_handle: u32,
3166    ) -> Result<()> {
3167        let mut instance = self.id().get_mut(store);
3168        let waitable =
3169            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3170
3171        let set = if set_handle == 0 {
3172            None
3173        } else {
3174            let set = instance.instance_states().0[caller_instance]
3175                .handle_table()
3176                .waitable_set_rep(set_handle)?;
3177
3178            Some(TableId::<WaitableSet>::new(set))
3179        };
3180
3181        log::trace!(
3182            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3183        );
3184
3185        waitable.join(store.concurrent_state_mut(), set)
3186    }
3187
3188    /// Implements the `subtask.drop` intrinsic.
3189    pub(crate) fn subtask_drop(
3190        self,
3191        store: &mut StoreOpaque,
3192        caller_instance: RuntimeComponentInstanceIndex,
3193        task_id: u32,
3194    ) -> Result<()> {
3195        self.waitable_join(store, caller_instance, task_id, 0)?;
3196
3197        let (rep, is_host) = store
3198            .instance_state(RuntimeInstance {
3199                instance: self.id().instance(),
3200                index: caller_instance,
3201            })
3202            .handle_table()
3203            .subtask_remove(task_id)?;
3204
3205        let concurrent_state = store.concurrent_state_mut();
3206        let (waitable, delete) = if is_host {
3207            let id = TableId::<HostTask>::new(rep);
3208            let task = concurrent_state.get_mut(id)?;
3209            match &task.state {
3210                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3211                HostTaskState::CalleeDone { .. } => {}
3212                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3213                    bail_bug!("invalid state for callee in `subtask.drop`")
3214                }
3215            }
3216            (Waitable::Host(id), true)
3217        } else {
3218            let id = TableId::<GuestTask>::new(rep);
3219            let task = concurrent_state.get_mut(id)?;
3220            if task.lift_result.is_some() {
3221                bail!(Trap::SubtaskDropNotResolved);
3222            }
3223            (
3224                Waitable::Guest(id),
3225                concurrent_state.get_mut(id)?.ready_to_delete(),
3226            )
3227        };
3228
3229        waitable.common(concurrent_state)?.handle = None;
3230
3231        // If this subtask has an event that means that the terminal status of
3232        // this subtask wasn't yet received so it can't be dropped yet.
3233        if waitable.take_event(concurrent_state)?.is_some() {
3234            bail!(Trap::SubtaskDropNotResolved);
3235        }
3236
3237        if delete {
3238            waitable.delete_from(concurrent_state)?;
3239        }
3240
3241        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3242        Ok(())
3243    }
3244
3245    /// Implements the `waitable-set.wait` intrinsic.
3246    pub(crate) fn waitable_set_wait(
3247        self,
3248        store: &mut StoreOpaque,
3249        options: OptionsIndex,
3250        set: u32,
3251        payload: u32,
3252    ) -> Result<u32> {
3253        if !self.options(store, options).async_ {
3254            // The caller may only call `waitable-set.wait` from an async task
3255            // (i.e. a task created via a call to an async export).
3256            // Otherwise, we'll trap.
3257            store.check_blocking()?;
3258        }
3259
3260        let &CanonicalOptions {
3261            cancellable,
3262            instance: caller_instance,
3263            ..
3264        } = &self.id().get(store).component().env_component().options[options];
3265        let rep = store
3266            .instance_state(RuntimeInstance {
3267                instance: self.id().instance(),
3268                index: caller_instance,
3269            })
3270            .handle_table()
3271            .waitable_set_rep(set)?;
3272
3273        self.waitable_check(
3274            store,
3275            cancellable,
3276            WaitableCheck::Wait,
3277            WaitableCheckParams {
3278                set: TableId::new(rep),
3279                options,
3280                payload,
3281            },
3282        )
3283    }
3284
3285    /// Implements the `waitable-set.poll` intrinsic.
3286    pub(crate) fn waitable_set_poll(
3287        self,
3288        store: &mut StoreOpaque,
3289        options: OptionsIndex,
3290        set: u32,
3291        payload: u32,
3292    ) -> Result<u32> {
3293        let &CanonicalOptions {
3294            cancellable,
3295            instance: caller_instance,
3296            ..
3297        } = &self.id().get(store).component().env_component().options[options];
3298        let rep = store
3299            .instance_state(RuntimeInstance {
3300                instance: self.id().instance(),
3301                index: caller_instance,
3302            })
3303            .handle_table()
3304            .waitable_set_rep(set)?;
3305
3306        self.waitable_check(
3307            store,
3308            cancellable,
3309            WaitableCheck::Poll,
3310            WaitableCheckParams {
3311                set: TableId::new(rep),
3312                options,
3313                payload,
3314            },
3315        )
3316    }
3317
3318    /// Implements the `thread.index` intrinsic.
3319    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3320        let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3321        match store
3322            .concurrent_state_mut()
3323            .get_mut(thread_id)?
3324            .instance_rep
3325        {
3326            Some(r) => Ok(r),
3327            None => bail_bug!("thread should have instance_rep by now"),
3328        }
3329    }
3330
3331    /// Implements the `thread.new-indirect` intrinsic.
3332    pub(crate) fn thread_new_indirect<T: 'static>(
3333        self,
3334        mut store: StoreContextMut<T>,
3335        runtime_instance: RuntimeComponentInstanceIndex,
3336        _func_ty_idx: TypeFuncIndex, // currently unused
3337        start_func_table_idx: RuntimeTableIndex,
3338        start_func_idx: u32,
3339        context: i32,
3340    ) -> Result<u32> {
3341        log::trace!("creating new thread");
3342
3343        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3344        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3345        let callee = instance
3346            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3347            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3348        if callee.type_index(store.0) != start_func_ty.type_index() {
3349            bail!(Trap::ThreadNewIndirectInvalidType);
3350        }
3351
3352        let token = StoreToken::new(store.as_context_mut());
3353        let start_func = Box::new(
3354            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3355                let old_thread = store.set_thread(guest_thread)?;
3356                log::trace!(
3357                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3358                );
3359
3360                let mut store = token.as_context_mut(store);
3361                let mut params = [ValRaw::i32(context)];
3362                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3363                // on a separate fiber if we're running in an async store.
3364                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3365
3366                store.0.set_thread(old_thread)?;
3367
3368                self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3369                log::trace!("explicit thread {guest_thread:?} completed");
3370                let state = store.0.concurrent_state_mut();
3371                let task = state.get_mut(guest_thread.task)?;
3372                if task.threads.is_empty() && !task.returned_or_cancelled() {
3373                    bail!(Trap::NoAsyncResult);
3374                }
3375                let state = store.0.concurrent_state_mut();
3376                if let Some(t) = old_thread.guest() {
3377                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3378                }
3379                if state.get_mut(guest_thread.task)?.ready_to_delete() {
3380                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3381                }
3382                log::trace!("thread start: restored {old_thread:?} as current thread");
3383
3384                Ok(())
3385            },
3386        );
3387
3388        let state = store.0.concurrent_state_mut();
3389        let current_thread = state.current_guest_thread()?;
3390        let parent_task = current_thread.task;
3391
3392        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3393        let thread_id = state.push(new_thread)?;
3394        state.get_mut(parent_task)?.threads.insert(thread_id);
3395
3396        log::trace!("new thread with id {thread_id:?} created");
3397
3398        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3399    }
3400
3401    pub(crate) fn resume_thread(
3402        self,
3403        store: &mut StoreOpaque,
3404        runtime_instance: RuntimeComponentInstanceIndex,
3405        thread_idx: u32,
3406        high_priority: bool,
3407        allow_ready: bool,
3408    ) -> Result<()> {
3409        let thread_id =
3410            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3411        let state = store.concurrent_state_mut();
3412        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3413        let thread = state.get_mut(guest_thread.thread)?;
3414
3415        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3416            GuestThreadState::NotStartedExplicit(start_func) => {
3417                log::trace!("starting thread {guest_thread:?}");
3418                let guest_call = WorkItem::GuestCall(
3419                    runtime_instance,
3420                    GuestCall {
3421                        thread: guest_thread,
3422                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3423                            start_func(store, guest_thread)
3424                        })),
3425                    },
3426                );
3427                store
3428                    .concurrent_state_mut()
3429                    .push_work_item(guest_call, high_priority);
3430            }
3431            GuestThreadState::Suspended(fiber) => {
3432                log::trace!("resuming thread {thread_id:?} that was suspended");
3433                store
3434                    .concurrent_state_mut()
3435                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3436            }
3437            GuestThreadState::Ready(fiber) if allow_ready => {
3438                log::trace!("resuming thread {thread_id:?} that was ready");
3439                thread.state = GuestThreadState::Ready(fiber);
3440                store
3441                    .concurrent_state_mut()
3442                    .promote_thread_work_item(guest_thread);
3443            }
3444            other => {
3445                thread.state = other;
3446                bail!(Trap::CannotResumeThread);
3447            }
3448        }
3449        Ok(())
3450    }
3451
3452    fn add_guest_thread_to_instance_table(
3453        self,
3454        thread_id: TableId<GuestThread>,
3455        store: &mut StoreOpaque,
3456        runtime_instance: RuntimeComponentInstanceIndex,
3457    ) -> Result<u32> {
3458        let guest_id = store
3459            .instance_state(RuntimeInstance {
3460                instance: self.id().instance(),
3461                index: runtime_instance,
3462            })
3463            .thread_handle_table()
3464            .guest_thread_insert(thread_id.rep())?;
3465        store
3466            .concurrent_state_mut()
3467            .get_mut(thread_id)?
3468            .instance_rep = Some(guest_id);
3469        Ok(guest_id)
3470    }
3471
3472    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3473    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3474    pub(crate) fn suspension_intrinsic(
3475        self,
3476        store: &mut StoreOpaque,
3477        caller: RuntimeComponentInstanceIndex,
3478        cancellable: bool,
3479        yielding: bool,
3480        to_thread: SuspensionTarget,
3481    ) -> Result<WaitResult> {
3482        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3483        if to_thread.is_none() {
3484            let state = store.concurrent_state_mut();
3485            if yielding {
3486                // This is a `thread.yield` call
3487                if !state.may_block(guest_thread.task)? {
3488                    // In a non-blocking context, a `thread.yield` may trigger
3489                    // other threads in the same component instance to run.
3490                    if !state.promote_instance_local_thread_work_item(caller) {
3491                        // No other threads are runnable, so just return
3492                        return Ok(WaitResult::Completed);
3493                    }
3494                }
3495            } else {
3496                // The caller may only call `thread.suspend` from an async task
3497                // (i.e. a task created via a call to an async export).
3498                // Otherwise, we'll trap.
3499                store.check_blocking()?;
3500            }
3501        }
3502
3503        // There could be a pending cancellation from a previous uncancellable wait
3504        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3505            return Ok(WaitResult::Cancelled);
3506        }
3507
3508        match to_thread {
3509            SuspensionTarget::SomeSuspended(thread) => {
3510                self.resume_thread(store, caller, thread, true, false)?
3511            }
3512            SuspensionTarget::Some(thread) => {
3513                self.resume_thread(store, caller, thread, true, true)?
3514            }
3515            SuspensionTarget::None => { /* nothing to do */ }
3516        }
3517
3518        let reason = if yielding {
3519            SuspendReason::Yielding {
3520                thread: guest_thread,
3521                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3522                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3523                // panic if we called it in a non-blocking context.
3524                skip_may_block_check: to_thread.is_some(),
3525            }
3526        } else {
3527            SuspendReason::ExplicitlySuspending {
3528                thread: guest_thread,
3529                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3530                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3531                // panic if we called it in a non-blocking context.
3532                skip_may_block_check: to_thread.is_some(),
3533            }
3534        };
3535
3536        store.suspend(reason)?;
3537
3538        if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3539            Ok(WaitResult::Cancelled)
3540        } else {
3541            Ok(WaitResult::Completed)
3542        }
3543    }
3544
3545    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3546    fn waitable_check(
3547        self,
3548        store: &mut StoreOpaque,
3549        cancellable: bool,
3550        check: WaitableCheck,
3551        params: WaitableCheckParams,
3552    ) -> Result<u32> {
3553        let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3554
3555        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3556
3557        let state = store.concurrent_state_mut();
3558        let task = state.get_mut(guest_thread.task)?;
3559
3560        // If we're waiting, and there are no events immediately available,
3561        // suspend the fiber until that changes.
3562        match &check {
3563            WaitableCheck::Wait => {
3564                let set = params.set;
3565
3566                if (task.event.is_none()
3567                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3568                    && state.get_mut(set)?.ready.is_empty()
3569                {
3570                    if cancellable {
3571                        let old = state
3572                            .get_mut(guest_thread.thread)?
3573                            .wake_on_cancel
3574                            .replace(set);
3575                        if !old.is_none() {
3576                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3577                        }
3578                    }
3579
3580                    store.suspend(SuspendReason::Waiting {
3581                        set,
3582                        thread: guest_thread,
3583                        skip_may_block_check: false,
3584                    })?;
3585                }
3586            }
3587            WaitableCheck::Poll => {}
3588        }
3589
3590        log::trace!(
3591            "waitable check for {guest_thread:?}; set {:?}, part two",
3592            params.set
3593        );
3594
3595        // Deliver any pending events to the guest and return.
3596        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3597
3598        let (ordinal, handle, result) = match &check {
3599            WaitableCheck::Wait => {
3600                let (event, waitable) = match event {
3601                    Some(p) => p,
3602                    None => bail_bug!("event expected to be present"),
3603                };
3604                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3605                let (ordinal, result) = event.parts();
3606                (ordinal, handle, result)
3607            }
3608            WaitableCheck::Poll => {
3609                if let Some((event, waitable)) = event {
3610                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3611                    let (ordinal, result) = event.parts();
3612                    (ordinal, handle, result)
3613                } else {
3614                    log::trace!(
3615                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3616                        guest_thread.task,
3617                        params.set
3618                    );
3619                    let (ordinal, result) = Event::None.parts();
3620                    (ordinal, 0, result)
3621                }
3622            }
3623        };
3624        let memory = self.options_memory_mut(store, params.options);
3625        let ptr = func::validate_inbounds_dynamic(
3626            &CanonicalAbiInfo::POINTER_PAIR,
3627            memory,
3628            &ValRaw::u32(params.payload),
3629        )?;
3630        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3631        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3632        Ok(ordinal)
3633    }
3634
3635    /// Implements the `subtask.cancel` intrinsic.
3636    pub(crate) fn subtask_cancel(
3637        self,
3638        store: &mut StoreOpaque,
3639        caller_instance: RuntimeComponentInstanceIndex,
3640        async_: bool,
3641        task_id: u32,
3642    ) -> Result<u32> {
3643        if !async_ {
3644            // The caller may only sync call `subtask.cancel` from an async task
3645            // (i.e. a task created via a call to an async export).  Otherwise,
3646            // we'll trap.
3647            store.check_blocking()?;
3648        }
3649
3650        let (rep, is_host) = store
3651            .instance_state(RuntimeInstance {
3652                instance: self.id().instance(),
3653                index: caller_instance,
3654            })
3655            .handle_table()
3656            .subtask_rep(task_id)?;
3657        let waitable = if is_host {
3658            Waitable::Host(TableId::<HostTask>::new(rep))
3659        } else {
3660            Waitable::Guest(TableId::<GuestTask>::new(rep))
3661        };
3662        let concurrent_state = store.concurrent_state_mut();
3663
3664        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3665
3666        let needs_block;
3667        if let Waitable::Host(host_task) = waitable {
3668            let state = &mut concurrent_state.get_mut(host_task)?.state;
3669            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3670                // If the callee is still running, signal an abort is requested.
3671                //
3672                // After cancelling this falls through to block waiting for the
3673                // host task to actually finish assuming that `async_` is false.
3674                // This blocking behavior resolves the race of `handle.abort()`
3675                // with the task actually getting cancelled or finishing.
3676                HostTaskState::CalleeRunning(handle) => {
3677                    handle.abort();
3678                    needs_block = true;
3679                }
3680
3681                // Cancellation was already requested, so fail as the task can't
3682                // be cancelled twice.
3683                HostTaskState::CalleeDone { cancelled } => {
3684                    if cancelled {
3685                        bail!(Trap::SubtaskCancelAfterTerminal);
3686                    } else {
3687                        // The callee is already done so there's no need to
3688                        // block further for an event.
3689                        needs_block = false;
3690                    }
3691                }
3692
3693                // These states should not be possible for a subtask that's
3694                // visible from the guest, so trap here.
3695                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3696                    bail_bug!("invalid states for host callee")
3697                }
3698            }
3699        } else {
3700            let caller = concurrent_state.current_guest_thread()?;
3701            let guest_task = TableId::<GuestTask>::new(rep);
3702            let task = concurrent_state.get_mut(guest_task)?;
3703            if !task.already_lowered_parameters() {
3704                // The task is in a `starting` state, meaning it hasn't run at
3705                // all yet.  Here we update its fields to indicate that it is
3706                // ready to delete immediately once `subtask.drop` is called.
3707                task.lower_params = None;
3708                task.lift_result = None;
3709                task.exited = true;
3710                let instance = task.instance;
3711
3712                // Clean up the thread within this task as it's now never going
3713                // to run.
3714                assert_eq!(1, task.threads.len());
3715                let thread = *task.threads.iter().next().unwrap();
3716                self.cleanup_thread(
3717                    store,
3718                    QualifiedThreadId {
3719                        task: guest_task,
3720                        thread,
3721                    },
3722                    caller_instance,
3723                )?;
3724
3725                // Not yet started; cancel and remove from pending
3726                let pending = &mut store.instance_state(instance).concurrent_state().pending;
3727                let pending_count = pending.len();
3728                pending.retain(|thread, _| thread.task != guest_task);
3729                // If there were no pending threads for this task, we're in an error state
3730                if pending.len() == pending_count {
3731                    bail!(Trap::SubtaskCancelAfterTerminal);
3732                }
3733                return Ok(Status::StartCancelled as u32);
3734            } else if !task.returned_or_cancelled() {
3735                // Started, but not yet returned or cancelled; send the
3736                // `CANCELLED` event
3737                task.cancel_sent = true;
3738                // Note that this might overwrite an event that was set earlier
3739                // (e.g. `Event::None` if the task is yielding, or
3740                // `Event::Cancelled` if it was already cancelled), but that's
3741                // okay -- this should supersede the previous state.
3742                task.event = Some(Event::Cancelled);
3743                let runtime_instance = task.instance.index;
3744                for thread in task.threads.clone() {
3745                    let thread = QualifiedThreadId {
3746                        task: guest_task,
3747                        thread,
3748                    };
3749                    if let Some(set) = concurrent_state
3750                        .get_mut(thread.thread)?
3751                        .wake_on_cancel
3752                        .take()
3753                    {
3754                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3755                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3756                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3757                                runtime_instance,
3758                                GuestCall {
3759                                    thread,
3760                                    kind: GuestCallKind::DeliverEvent {
3761                                        instance,
3762                                        set: None,
3763                                    },
3764                                },
3765                            ),
3766                            None => bail_bug!("thread not present in wake_on_cancel set"),
3767                        };
3768                        concurrent_state.push_high_priority(item);
3769
3770                        store.suspend(SuspendReason::Yielding {
3771                            thread: caller,
3772                            // `subtask.cancel` is not allowed to be called in a
3773                            // sync context, so we cannot skip the may-block check.
3774                            skip_may_block_check: false,
3775                        })?;
3776                        break;
3777                    }
3778                }
3779
3780                // Guest tasks need to block if they have not yet returned or
3781                // cancelled, even as a result of the event delivery above.
3782                needs_block = !store
3783                    .concurrent_state_mut()
3784                    .get_mut(guest_task)?
3785                    .returned_or_cancelled()
3786            } else {
3787                needs_block = false;
3788            }
3789        };
3790
3791        // If we need to block waiting on the terminal status of this subtask
3792        // then return immediately in `async` mode, or otherwise wait for the
3793        // event to get signaled through the store.
3794        if needs_block {
3795            if async_ {
3796                return Ok(BLOCKED);
3797            }
3798
3799            // Wait for this waitable to get signaled with its terminal status
3800            // from the completion callback enqueued by `first_poll`. Once
3801            // that's done fall through to the sahred
3802            store.wait_for_event(waitable)?;
3803
3804            // .. fall through to determine what event's in store for us.
3805        }
3806
3807        let event = waitable.take_event(store.concurrent_state_mut())?;
3808        if let Some(Event::Subtask {
3809            status: status @ (Status::Returned | Status::ReturnCancelled),
3810        }) = event
3811        {
3812            Ok(status as u32)
3813        } else {
3814            bail!(Trap::SubtaskCancelAfterTerminal);
3815        }
3816    }
3817}
3818
3819/// Trait representing component model ABI async intrinsics and fused adapter
3820/// helper functions.
3821///
3822/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3823/// which must be valid for at least the duration of the call (and possibly for
3824/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3825/// pointers used for async calls).
3826pub trait VMComponentAsyncStore {
3827    /// A helper function for fused adapter modules involving calls where the
3828    /// one of the caller or callee is async.
3829    ///
3830    /// This helper is not used when the caller and callee both use the sync
3831    /// ABI, only when at least one is async is this used.
3832    unsafe fn prepare_call(
3833        &mut self,
3834        instance: Instance,
3835        memory: *mut VMMemoryDefinition,
3836        start: NonNull<VMFuncRef>,
3837        return_: NonNull<VMFuncRef>,
3838        caller_instance: RuntimeComponentInstanceIndex,
3839        callee_instance: RuntimeComponentInstanceIndex,
3840        task_return_type: TypeTupleIndex,
3841        callee_async: bool,
3842        string_encoding: StringEncoding,
3843        result_count: u32,
3844        storage: *mut ValRaw,
3845        storage_len: usize,
3846    ) -> Result<()>;
3847
3848    /// A helper function for fused adapter modules involving calls where the
3849    /// caller is sync-lowered but the callee is async-lifted.
3850    unsafe fn sync_start(
3851        &mut self,
3852        instance: Instance,
3853        callback: *mut VMFuncRef,
3854        callee: NonNull<VMFuncRef>,
3855        param_count: u32,
3856        storage: *mut MaybeUninit<ValRaw>,
3857        storage_len: usize,
3858    ) -> Result<()>;
3859
3860    /// A helper function for fused adapter modules involving calls where the
3861    /// caller is async-lowered.
3862    unsafe fn async_start(
3863        &mut self,
3864        instance: Instance,
3865        callback: *mut VMFuncRef,
3866        post_return: *mut VMFuncRef,
3867        callee: NonNull<VMFuncRef>,
3868        param_count: u32,
3869        result_count: u32,
3870        flags: u32,
3871    ) -> Result<u32>;
3872
3873    /// The `future.write` intrinsic.
3874    fn future_write(
3875        &mut self,
3876        instance: Instance,
3877        caller: RuntimeComponentInstanceIndex,
3878        ty: TypeFutureTableIndex,
3879        options: OptionsIndex,
3880        future: u32,
3881        address: u32,
3882    ) -> Result<u32>;
3883
3884    /// The `future.read` intrinsic.
3885    fn future_read(
3886        &mut self,
3887        instance: Instance,
3888        caller: RuntimeComponentInstanceIndex,
3889        ty: TypeFutureTableIndex,
3890        options: OptionsIndex,
3891        future: u32,
3892        address: u32,
3893    ) -> Result<u32>;
3894
3895    /// The `future.drop-writable` intrinsic.
3896    fn future_drop_writable(
3897        &mut self,
3898        instance: Instance,
3899        ty: TypeFutureTableIndex,
3900        writer: u32,
3901    ) -> Result<()>;
3902
3903    /// The `stream.write` intrinsic.
3904    fn stream_write(
3905        &mut self,
3906        instance: Instance,
3907        caller: RuntimeComponentInstanceIndex,
3908        ty: TypeStreamTableIndex,
3909        options: OptionsIndex,
3910        stream: u32,
3911        address: u32,
3912        count: u32,
3913    ) -> Result<u32>;
3914
3915    /// The `stream.read` intrinsic.
3916    fn stream_read(
3917        &mut self,
3918        instance: Instance,
3919        caller: RuntimeComponentInstanceIndex,
3920        ty: TypeStreamTableIndex,
3921        options: OptionsIndex,
3922        stream: u32,
3923        address: u32,
3924        count: u32,
3925    ) -> Result<u32>;
3926
3927    /// The "fast-path" implementation of the `stream.write` intrinsic for
3928    /// "flat" (i.e. memcpy-able) payloads.
3929    fn flat_stream_write(
3930        &mut self,
3931        instance: Instance,
3932        caller: RuntimeComponentInstanceIndex,
3933        ty: TypeStreamTableIndex,
3934        options: OptionsIndex,
3935        payload_size: u32,
3936        payload_align: u32,
3937        stream: u32,
3938        address: u32,
3939        count: u32,
3940    ) -> Result<u32>;
3941
3942    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3943    /// (i.e. memcpy-able) payloads.
3944    fn flat_stream_read(
3945        &mut self,
3946        instance: Instance,
3947        caller: RuntimeComponentInstanceIndex,
3948        ty: TypeStreamTableIndex,
3949        options: OptionsIndex,
3950        payload_size: u32,
3951        payload_align: u32,
3952        stream: u32,
3953        address: u32,
3954        count: u32,
3955    ) -> Result<u32>;
3956
3957    /// The `stream.drop-writable` intrinsic.
3958    fn stream_drop_writable(
3959        &mut self,
3960        instance: Instance,
3961        ty: TypeStreamTableIndex,
3962        writer: u32,
3963    ) -> Result<()>;
3964
3965    /// The `error-context.debug-message` intrinsic.
3966    fn error_context_debug_message(
3967        &mut self,
3968        instance: Instance,
3969        ty: TypeComponentLocalErrorContextTableIndex,
3970        options: OptionsIndex,
3971        err_ctx_handle: u32,
3972        debug_msg_address: u32,
3973    ) -> Result<()>;
3974
3975    /// The `thread.new-indirect` intrinsic
3976    fn thread_new_indirect(
3977        &mut self,
3978        instance: Instance,
3979        caller: RuntimeComponentInstanceIndex,
3980        func_ty_idx: TypeFuncIndex,
3981        start_func_table_idx: RuntimeTableIndex,
3982        start_func_idx: u32,
3983        context: i32,
3984    ) -> Result<u32>;
3985}
3986
3987/// SAFETY: See trait docs.
3988impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3989    unsafe fn prepare_call(
3990        &mut self,
3991        instance: Instance,
3992        memory: *mut VMMemoryDefinition,
3993        start: NonNull<VMFuncRef>,
3994        return_: NonNull<VMFuncRef>,
3995        caller_instance: RuntimeComponentInstanceIndex,
3996        callee_instance: RuntimeComponentInstanceIndex,
3997        task_return_type: TypeTupleIndex,
3998        callee_async: bool,
3999        string_encoding: StringEncoding,
4000        result_count_or_max_if_async: u32,
4001        storage: *mut ValRaw,
4002        storage_len: usize,
4003    ) -> Result<()> {
4004        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4005        // this method will have ensured that `storage` is a valid
4006        // pointer containing at least `storage_len` items.
4007        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
4008
4009        unsafe {
4010            instance.prepare_call(
4011                StoreContextMut(self),
4012                start,
4013                return_,
4014                caller_instance,
4015                callee_instance,
4016                task_return_type,
4017                callee_async,
4018                memory,
4019                string_encoding,
4020                match result_count_or_max_if_async {
4021                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4022                        params,
4023                        has_result: false,
4024                    },
4025                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4026                        params,
4027                        has_result: true,
4028                    },
4029                    result_count => CallerInfo::Sync {
4030                        params,
4031                        result_count,
4032                    },
4033                },
4034            )
4035        }
4036    }
4037
4038    unsafe fn sync_start(
4039        &mut self,
4040        instance: Instance,
4041        callback: *mut VMFuncRef,
4042        callee: NonNull<VMFuncRef>,
4043        param_count: u32,
4044        storage: *mut MaybeUninit<ValRaw>,
4045        storage_len: usize,
4046    ) -> Result<()> {
4047        unsafe {
4048            instance
4049                .start_call(
4050                    StoreContextMut(self),
4051                    callback,
4052                    ptr::null_mut(),
4053                    callee,
4054                    param_count,
4055                    1,
4056                    START_FLAG_ASYNC_CALLEE,
4057                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4058                    // this method will have ensured that `storage` is a valid
4059                    // pointer containing at least `storage_len` items.
4060                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
4061                )
4062                .map(drop)
4063        }
4064    }
4065
4066    unsafe fn async_start(
4067        &mut self,
4068        instance: Instance,
4069        callback: *mut VMFuncRef,
4070        post_return: *mut VMFuncRef,
4071        callee: NonNull<VMFuncRef>,
4072        param_count: u32,
4073        result_count: u32,
4074        flags: u32,
4075    ) -> Result<u32> {
4076        unsafe {
4077            instance.start_call(
4078                StoreContextMut(self),
4079                callback,
4080                post_return,
4081                callee,
4082                param_count,
4083                result_count,
4084                flags,
4085                None,
4086            )
4087        }
4088    }
4089
4090    fn future_write(
4091        &mut self,
4092        instance: Instance,
4093        caller: RuntimeComponentInstanceIndex,
4094        ty: TypeFutureTableIndex,
4095        options: OptionsIndex,
4096        future: u32,
4097        address: u32,
4098    ) -> Result<u32> {
4099        instance
4100            .guest_write(
4101                StoreContextMut(self),
4102                caller,
4103                TransmitIndex::Future(ty),
4104                options,
4105                None,
4106                future,
4107                address,
4108                1,
4109            )
4110            .map(|result| result.encode())
4111    }
4112
4113    fn future_read(
4114        &mut self,
4115        instance: Instance,
4116        caller: RuntimeComponentInstanceIndex,
4117        ty: TypeFutureTableIndex,
4118        options: OptionsIndex,
4119        future: u32,
4120        address: u32,
4121    ) -> Result<u32> {
4122        instance
4123            .guest_read(
4124                StoreContextMut(self),
4125                caller,
4126                TransmitIndex::Future(ty),
4127                options,
4128                None,
4129                future,
4130                address,
4131                1,
4132            )
4133            .map(|result| result.encode())
4134    }
4135
4136    fn stream_write(
4137        &mut self,
4138        instance: Instance,
4139        caller: RuntimeComponentInstanceIndex,
4140        ty: TypeStreamTableIndex,
4141        options: OptionsIndex,
4142        stream: u32,
4143        address: u32,
4144        count: u32,
4145    ) -> Result<u32> {
4146        instance
4147            .guest_write(
4148                StoreContextMut(self),
4149                caller,
4150                TransmitIndex::Stream(ty),
4151                options,
4152                None,
4153                stream,
4154                address,
4155                count,
4156            )
4157            .map(|result| result.encode())
4158    }
4159
4160    fn stream_read(
4161        &mut self,
4162        instance: Instance,
4163        caller: RuntimeComponentInstanceIndex,
4164        ty: TypeStreamTableIndex,
4165        options: OptionsIndex,
4166        stream: u32,
4167        address: u32,
4168        count: u32,
4169    ) -> Result<u32> {
4170        instance
4171            .guest_read(
4172                StoreContextMut(self),
4173                caller,
4174                TransmitIndex::Stream(ty),
4175                options,
4176                None,
4177                stream,
4178                address,
4179                count,
4180            )
4181            .map(|result| result.encode())
4182    }
4183
4184    fn future_drop_writable(
4185        &mut self,
4186        instance: Instance,
4187        ty: TypeFutureTableIndex,
4188        writer: u32,
4189    ) -> Result<()> {
4190        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4191    }
4192
4193    fn flat_stream_write(
4194        &mut self,
4195        instance: Instance,
4196        caller: RuntimeComponentInstanceIndex,
4197        ty: TypeStreamTableIndex,
4198        options: OptionsIndex,
4199        payload_size: u32,
4200        payload_align: u32,
4201        stream: u32,
4202        address: u32,
4203        count: u32,
4204    ) -> Result<u32> {
4205        instance
4206            .guest_write(
4207                StoreContextMut(self),
4208                caller,
4209                TransmitIndex::Stream(ty),
4210                options,
4211                Some(FlatAbi {
4212                    size: payload_size,
4213                    align: payload_align,
4214                }),
4215                stream,
4216                address,
4217                count,
4218            )
4219            .map(|result| result.encode())
4220    }
4221
4222    fn flat_stream_read(
4223        &mut self,
4224        instance: Instance,
4225        caller: RuntimeComponentInstanceIndex,
4226        ty: TypeStreamTableIndex,
4227        options: OptionsIndex,
4228        payload_size: u32,
4229        payload_align: u32,
4230        stream: u32,
4231        address: u32,
4232        count: u32,
4233    ) -> Result<u32> {
4234        instance
4235            .guest_read(
4236                StoreContextMut(self),
4237                caller,
4238                TransmitIndex::Stream(ty),
4239                options,
4240                Some(FlatAbi {
4241                    size: payload_size,
4242                    align: payload_align,
4243                }),
4244                stream,
4245                address,
4246                count,
4247            )
4248            .map(|result| result.encode())
4249    }
4250
4251    fn stream_drop_writable(
4252        &mut self,
4253        instance: Instance,
4254        ty: TypeStreamTableIndex,
4255        writer: u32,
4256    ) -> Result<()> {
4257        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4258    }
4259
4260    fn error_context_debug_message(
4261        &mut self,
4262        instance: Instance,
4263        ty: TypeComponentLocalErrorContextTableIndex,
4264        options: OptionsIndex,
4265        err_ctx_handle: u32,
4266        debug_msg_address: u32,
4267    ) -> Result<()> {
4268        instance.error_context_debug_message(
4269            StoreContextMut(self),
4270            ty,
4271            options,
4272            err_ctx_handle,
4273            debug_msg_address,
4274        )
4275    }
4276
4277    fn thread_new_indirect(
4278        &mut self,
4279        instance: Instance,
4280        caller: RuntimeComponentInstanceIndex,
4281        func_ty_idx: TypeFuncIndex,
4282        start_func_table_idx: RuntimeTableIndex,
4283        start_func_idx: u32,
4284        context: i32,
4285    ) -> Result<u32> {
4286        instance.thread_new_indirect(
4287            StoreContextMut(self),
4288            caller,
4289            func_ty_idx,
4290            start_func_table_idx,
4291            start_func_idx,
4292            context,
4293        )
4294    }
4295}
4296
4297type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4298
4299/// Represents the state of a pending host task.
4300///
4301/// This is used to represent tasks when the guest calls into the host.
4302pub(crate) struct HostTask {
4303    common: WaitableCommon,
4304
4305    /// Guest thread which called the host.
4306    caller: QualifiedThreadId,
4307
4308    /// State of borrows/etc the host needs to track. Used when the guest passes
4309    /// borrows to the host, for example.
4310    call_context: CallContext,
4311
4312    state: HostTaskState,
4313}
4314
4315enum HostTaskState {
4316    /// A host task has been created and it's considered "started".
4317    ///
4318    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4319    /// is where this will get updated further.
4320    CalleeStarted,
4321
4322    /// State used for tasks in `first_poll` meaning that the guest did an async
4323    /// lower of a host async function which is blocked. The specified handle is
4324    /// linked to the future in the main `FuturesUnordered` of a store which is
4325    /// used to cancel it if the guest requests cancellation.
4326    CalleeRunning(JoinHandle),
4327
4328    /// Terminal state used for tasks in `poll_and_block` to store the result of
4329    /// their computation. Note that this state is not used for tasks in
4330    /// `first_poll`.
4331    CalleeFinished(LiftedResult),
4332
4333    /// Terminal state for host tasks meaning that the task was cancelled or the
4334    /// result was taken.
4335    CalleeDone { cancelled: bool },
4336}
4337
4338impl HostTask {
4339    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4340        Self {
4341            common: WaitableCommon::default(),
4342            call_context: CallContext::default(),
4343            caller,
4344            state,
4345        }
4346    }
4347}
4348
4349impl TableDebug for HostTask {
4350    fn type_name() -> &'static str {
4351        "HostTask"
4352    }
4353}
4354
4355type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4356
4357/// Represents the caller of a given guest task.
4358enum Caller {
4359    /// The host called the guest task.
4360    Host {
4361        /// If present, may be used to deliver the result.
4362        tx: Option<oneshot::Sender<LiftedResult>>,
4363        /// If true, there's a host future that must be dropped before the task
4364        /// can be deleted.
4365        host_future_present: bool,
4366        /// Represents the caller of the host function which called back into a
4367        /// guest. Note that this thread could belong to an entirely unrelated
4368        /// top-level component instance than the one the host called into.
4369        caller: CurrentThread,
4370    },
4371    /// Another guest thread called the guest task
4372    Guest {
4373        /// The id of the caller
4374        thread: QualifiedThreadId,
4375    },
4376}
4377
4378/// Represents a closure and related canonical ABI parameters required to
4379/// validate a `task.return` call at runtime and lift the result.
4380struct LiftResult {
4381    lift: RawLift,
4382    ty: TypeTupleIndex,
4383    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4384    string_encoding: StringEncoding,
4385}
4386
4387/// The table ID for a guest thread, qualified by the task to which it belongs.
4388///
4389/// This exists to minimize table lookups and the necessity to pass stores around mutably
4390/// for the common case of identifying the task to which a thread belongs.
4391#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4392pub(crate) struct QualifiedThreadId {
4393    task: TableId<GuestTask>,
4394    thread: TableId<GuestThread>,
4395}
4396
4397impl QualifiedThreadId {
4398    fn qualify(
4399        state: &mut ConcurrentState,
4400        thread: TableId<GuestThread>,
4401    ) -> Result<QualifiedThreadId> {
4402        Ok(QualifiedThreadId {
4403            task: state.get_mut(thread)?.parent_task,
4404            thread,
4405        })
4406    }
4407}
4408
4409impl fmt::Debug for QualifiedThreadId {
4410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4411        f.debug_tuple("QualifiedThreadId")
4412            .field(&self.task.rep())
4413            .field(&self.thread.rep())
4414            .finish()
4415    }
4416}
4417
4418enum GuestThreadState {
4419    NotStartedImplicit,
4420    NotStartedExplicit(
4421        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4422    ),
4423    Running,
4424    Suspended(StoreFiber<'static>),
4425    Ready(StoreFiber<'static>),
4426    Completed,
4427}
4428pub struct GuestThread {
4429    /// Context-local state used to implement the `context.{get,set}`
4430    /// intrinsics.
4431    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4432    /// The owning guest task.
4433    parent_task: TableId<GuestTask>,
4434    /// If present, indicates that the thread is currently waiting on the
4435    /// specified set but may be cancelled and woken immediately.
4436    wake_on_cancel: Option<TableId<WaitableSet>>,
4437    /// The execution state of this guest thread
4438    state: GuestThreadState,
4439    /// The index of this thread in the component instance's handle table.
4440    /// This must always be `Some` after initialization.
4441    instance_rep: Option<u32>,
4442    /// Scratch waitable set used to watch subtasks during synchronous calls.
4443    sync_call_set: TableId<WaitableSet>,
4444}
4445
4446impl GuestThread {
4447    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4448    /// handle.
4449    fn from_instance(
4450        state: Pin<&mut ComponentInstance>,
4451        caller_instance: RuntimeComponentInstanceIndex,
4452        guest_thread: u32,
4453    ) -> Result<TableId<Self>> {
4454        let rep = state.instance_states().0[caller_instance]
4455            .thread_handle_table()
4456            .guest_thread_rep(guest_thread)?;
4457        Ok(TableId::new(rep))
4458    }
4459
4460    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4461        let sync_call_set = state.push(WaitableSet::default())?;
4462        Ok(Self {
4463            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4464            parent_task,
4465            wake_on_cancel: None,
4466            state: GuestThreadState::NotStartedImplicit,
4467            instance_rep: None,
4468            sync_call_set,
4469        })
4470    }
4471
4472    fn new_explicit(
4473        state: &mut ConcurrentState,
4474        parent_task: TableId<GuestTask>,
4475        start_func: Box<
4476            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4477        >,
4478    ) -> Result<Self> {
4479        let sync_call_set = state.push(WaitableSet::default())?;
4480        Ok(Self {
4481            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4482            parent_task,
4483            wake_on_cancel: None,
4484            state: GuestThreadState::NotStartedExplicit(start_func),
4485            instance_rep: None,
4486            sync_call_set,
4487        })
4488    }
4489}
4490
4491impl TableDebug for GuestThread {
4492    fn type_name() -> &'static str {
4493        "GuestThread"
4494    }
4495}
4496
4497enum SyncResult {
4498    NotProduced,
4499    Produced(Option<ValRaw>),
4500    Taken,
4501}
4502
4503impl SyncResult {
4504    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4505        Ok(match mem::replace(self, SyncResult::Taken) {
4506            SyncResult::NotProduced => None,
4507            SyncResult::Produced(val) => Some(val),
4508            SyncResult::Taken => {
4509                bail_bug!("attempted to take a synchronous result that was already taken")
4510            }
4511        })
4512    }
4513}
4514
4515#[derive(Debug)]
4516enum HostFutureState {
4517    NotApplicable,
4518    Live,
4519    Dropped,
4520}
4521
4522/// Represents a pending guest task.
4523pub(crate) struct GuestTask {
4524    /// See `WaitableCommon`
4525    common: WaitableCommon,
4526    /// Closure to lower the parameters passed to this task.
4527    lower_params: Option<RawLower>,
4528    /// See `LiftResult`
4529    lift_result: Option<LiftResult>,
4530    /// A place to stash the type-erased lifted result if it can't be delivered
4531    /// immediately.
4532    result: Option<LiftedResult>,
4533    /// Closure to call the callback function for an async-lifted export, if
4534    /// provided.
4535    callback: Option<CallbackFn>,
4536    /// See `Caller`
4537    caller: Caller,
4538    /// Borrow state for this task.
4539    ///
4540    /// Keeps track of `borrow<T>` received to this task to ensure that
4541    /// everything is dropped by the time it exits.
4542    call_context: CallContext,
4543    /// A place to stash the lowered result for a sync-to-async call until it
4544    /// can be returned to the caller.
4545    sync_result: SyncResult,
4546    /// Whether or not the task has been cancelled (i.e. whether the task is
4547    /// permitted to call `task.cancel`).
4548    cancel_sent: bool,
4549    /// Whether or not we've sent a `Status::Starting` event to any current or
4550    /// future waiters for this waitable.
4551    starting_sent: bool,
4552    /// The runtime instance to which the exported function for this guest task
4553    /// belongs.
4554    ///
4555    /// Note that the task may do a sync->sync call via a fused adapter which
4556    /// results in that task executing code in a different instance, and it may
4557    /// call host functions and intrinsics from that other instance.
4558    instance: RuntimeInstance,
4559    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4560    /// delivered to this task.
4561    event: Option<Event>,
4562    /// Whether or not the task has exited.
4563    exited: bool,
4564    /// Threads belonging to this task
4565    threads: HashSet<TableId<GuestThread>>,
4566    /// The state of the host future that represents an async task, which must
4567    /// be dropped before we can delete the task.
4568    host_future_state: HostFutureState,
4569    /// Indicates whether this task was created for a call to an async-lifted
4570    /// export.
4571    async_function: bool,
4572}
4573
4574impl GuestTask {
4575    fn already_lowered_parameters(&self) -> bool {
4576        // We reset `lower_params` after we lower the parameters
4577        self.lower_params.is_none()
4578    }
4579
4580    fn returned_or_cancelled(&self) -> bool {
4581        // We reset `lift_result` after we return or exit
4582        self.lift_result.is_none()
4583    }
4584
4585    fn ready_to_delete(&self) -> bool {
4586        let threads_completed = self.threads.is_empty();
4587        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4588        let pending_completion_event = matches!(
4589            self.common.event,
4590            Some(Event::Subtask {
4591                status: Status::Returned | Status::ReturnCancelled
4592            })
4593        );
4594        let ready = threads_completed
4595            && !has_sync_result
4596            && !pending_completion_event
4597            && !matches!(self.host_future_state, HostFutureState::Live);
4598        log::trace!(
4599            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4600            threads_completed,
4601            has_sync_result,
4602            pending_completion_event,
4603            self.host_future_state
4604        );
4605        ready
4606    }
4607
4608    fn new(
4609        lower_params: RawLower,
4610        lift_result: LiftResult,
4611        caller: Caller,
4612        callback: Option<CallbackFn>,
4613        instance: RuntimeInstance,
4614        async_function: bool,
4615    ) -> Result<Self> {
4616        let host_future_state = match &caller {
4617            Caller::Guest { .. } => HostFutureState::NotApplicable,
4618            Caller::Host {
4619                host_future_present,
4620                ..
4621            } => {
4622                if *host_future_present {
4623                    HostFutureState::Live
4624                } else {
4625                    HostFutureState::NotApplicable
4626                }
4627            }
4628        };
4629        Ok(Self {
4630            common: WaitableCommon::default(),
4631            lower_params: Some(lower_params),
4632            lift_result: Some(lift_result),
4633            result: None,
4634            callback,
4635            caller,
4636            call_context: CallContext::default(),
4637            sync_result: SyncResult::NotProduced,
4638            cancel_sent: false,
4639            starting_sent: false,
4640            instance,
4641            event: None,
4642            exited: false,
4643            threads: HashSet::new(),
4644            host_future_state,
4645            async_function,
4646        })
4647    }
4648}
4649
4650impl TableDebug for GuestTask {
4651    fn type_name() -> &'static str {
4652        "GuestTask"
4653    }
4654}
4655
4656/// Represents state common to all kinds of waitables.
4657#[derive(Default)]
4658struct WaitableCommon {
4659    /// The currently pending event for this waitable, if any.
4660    event: Option<Event>,
4661    /// The set to which this waitable belongs, if any.
4662    set: Option<TableId<WaitableSet>>,
4663    /// The handle with which the guest refers to this waitable, if any.
4664    handle: Option<u32>,
4665}
4666
4667/// Represents a Component Model Async `waitable`.
4668#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4669enum Waitable {
4670    /// A host task
4671    Host(TableId<HostTask>),
4672    /// A guest task
4673    Guest(TableId<GuestTask>),
4674    /// The read or write end of a stream or future
4675    Transmit(TableId<TransmitHandle>),
4676}
4677
4678impl Waitable {
4679    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4680    /// handle.
4681    fn from_instance(
4682        state: Pin<&mut ComponentInstance>,
4683        caller_instance: RuntimeComponentInstanceIndex,
4684        waitable: u32,
4685    ) -> Result<Self> {
4686        use crate::runtime::vm::component::Waitable;
4687
4688        let (waitable, kind) = state.instance_states().0[caller_instance]
4689            .handle_table()
4690            .waitable_rep(waitable)?;
4691
4692        Ok(match kind {
4693            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4694            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4695            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4696        })
4697    }
4698
4699    /// Retrieve the host-visible identifier for this `Waitable`.
4700    fn rep(&self) -> u32 {
4701        match self {
4702            Self::Host(id) => id.rep(),
4703            Self::Guest(id) => id.rep(),
4704            Self::Transmit(id) => id.rep(),
4705        }
4706    }
4707
4708    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4709    /// remove it from any set it may currently belong to (when `set` is
4710    /// `None`).
4711    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4712        log::trace!("waitable {self:?} join set {set:?}",);
4713
4714        let old = mem::replace(&mut self.common(state)?.set, set);
4715
4716        if let Some(old) = old {
4717            match *self {
4718                Waitable::Host(id) => state.remove_child(id, old),
4719                Waitable::Guest(id) => state.remove_child(id, old),
4720                Waitable::Transmit(id) => state.remove_child(id, old),
4721            }?;
4722
4723            state.get_mut(old)?.ready.remove(self);
4724        }
4725
4726        if let Some(set) = set {
4727            match *self {
4728                Waitable::Host(id) => state.add_child(id, set),
4729                Waitable::Guest(id) => state.add_child(id, set),
4730                Waitable::Transmit(id) => state.add_child(id, set),
4731            }?;
4732
4733            if self.common(state)?.event.is_some() {
4734                self.mark_ready(state)?;
4735            }
4736        }
4737
4738        Ok(())
4739    }
4740
4741    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4742    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4743        Ok(match self {
4744            Self::Host(id) => &mut state.get_mut(*id)?.common,
4745            Self::Guest(id) => &mut state.get_mut(*id)?.common,
4746            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4747        })
4748    }
4749
4750    /// Set or clear the pending event for this waitable and either deliver it
4751    /// to the first waiter, if any, or mark it as ready to be delivered to the
4752    /// next waiter that arrives.
4753    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4754        log::trace!("set event for {self:?}: {event:?}");
4755        self.common(state)?.event = event;
4756        self.mark_ready(state)
4757    }
4758
4759    /// Take the pending event from this waitable, leaving `None` in its place.
4760    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4761        let common = self.common(state)?;
4762        let event = common.event.take();
4763        if let Some(set) = self.common(state)?.set {
4764            state.get_mut(set)?.ready.remove(self);
4765        }
4766
4767        Ok(event)
4768    }
4769
4770    /// Deliver the current event for this waitable to the first waiter, if any,
4771    /// or else mark it as ready to be delivered to the next waiter that
4772    /// arrives.
4773    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4774        if let Some(set) = self.common(state)?.set {
4775            state.get_mut(set)?.ready.insert(*self);
4776            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4777                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4778                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4779
4780                let item = match mode {
4781                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4782                    WaitMode::Callback(instance) => WorkItem::GuestCall(
4783                        state.get_mut(thread.task)?.instance.index,
4784                        GuestCall {
4785                            thread,
4786                            kind: GuestCallKind::DeliverEvent {
4787                                instance,
4788                                set: Some(set),
4789                            },
4790                        },
4791                    ),
4792                };
4793                state.push_high_priority(item);
4794            }
4795        }
4796        Ok(())
4797    }
4798
4799    /// Remove this waitable from the instance's rep table.
4800    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4801        match self {
4802            Self::Host(task) => {
4803                log::trace!("delete host task {task:?}");
4804                state.delete(*task)?;
4805            }
4806            Self::Guest(task) => {
4807                log::trace!("delete guest task {task:?}");
4808                state.delete(*task)?;
4809            }
4810            Self::Transmit(task) => {
4811                state.delete(*task)?;
4812            }
4813        }
4814
4815        Ok(())
4816    }
4817}
4818
4819impl fmt::Debug for Waitable {
4820    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4821        match self {
4822            Self::Host(id) => write!(f, "{id:?}"),
4823            Self::Guest(id) => write!(f, "{id:?}"),
4824            Self::Transmit(id) => write!(f, "{id:?}"),
4825        }
4826    }
4827}
4828
4829/// Represents a Component Model Async `waitable-set`.
4830#[derive(Default)]
4831struct WaitableSet {
4832    /// Which waitables in this set have pending events, if any.
4833    ready: BTreeSet<Waitable>,
4834    /// Which guest threads are currently waiting on this set, if any.
4835    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4836}
4837
4838impl TableDebug for WaitableSet {
4839    fn type_name() -> &'static str {
4840        "WaitableSet"
4841    }
4842}
4843
4844/// Type-erased closure to lower the parameters for a guest task.
4845type RawLower =
4846    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4847
4848/// Type-erased closure to lift the result for a guest task.
4849type RawLift = Box<
4850    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4851>;
4852
4853/// Type erased result of a guest task which may be downcast to the expected
4854/// type by a host caller (or simply ignored in the case of a guest caller; see
4855/// `DummyResult`).
4856type LiftedResult = Box<dyn Any + Send + Sync>;
4857
4858/// Used to return a result from a `LiftFn` when the actual result has already
4859/// been lowered to a guest task's stack and linear memory.
4860struct DummyResult;
4861
4862/// Represents the Component Model Async state of a (sub-)component instance.
4863#[derive(Default)]
4864pub struct ConcurrentInstanceState {
4865    /// Whether backpressure is set for this instance (enabled if >0)
4866    backpressure: u16,
4867    /// Whether this instance can be entered
4868    do_not_enter: bool,
4869    /// Pending calls for this instance which require `Self::backpressure` to be
4870    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4871    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4872}
4873
4874impl ConcurrentInstanceState {
4875    pub fn pending_is_empty(&self) -> bool {
4876        self.pending.is_empty()
4877    }
4878}
4879
4880#[derive(Debug, Copy, Clone)]
4881pub(crate) enum CurrentThread {
4882    Guest(QualifiedThreadId),
4883    Host(TableId<HostTask>),
4884    None,
4885}
4886
4887impl CurrentThread {
4888    fn guest(&self) -> Option<&QualifiedThreadId> {
4889        match self {
4890            Self::Guest(id) => Some(id),
4891            _ => None,
4892        }
4893    }
4894
4895    fn host(&self) -> Option<TableId<HostTask>> {
4896        match self {
4897            Self::Host(id) => Some(*id),
4898            _ => None,
4899        }
4900    }
4901
4902    fn is_none(&self) -> bool {
4903        matches!(self, Self::None)
4904    }
4905}
4906
4907impl From<QualifiedThreadId> for CurrentThread {
4908    fn from(id: QualifiedThreadId) -> Self {
4909        Self::Guest(id)
4910    }
4911}
4912
4913impl From<TableId<HostTask>> for CurrentThread {
4914    fn from(id: TableId<HostTask>) -> Self {
4915        Self::Host(id)
4916    }
4917}
4918
4919/// Represents the Component Model Async state of a store.
4920pub struct ConcurrentState {
4921    /// The currently running thread, if any.
4922    current_thread: CurrentThread,
4923
4924    /// The set of pending host and background tasks, if any.
4925    ///
4926    /// See `ComponentInstance::poll_until` for where we temporarily take this
4927    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4928    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4929    /// The table of waitables, waitable sets, etc.
4930    table: AlwaysMut<ResourceTable>,
4931    /// The "high priority" work queue for this store's event loop.
4932    high_priority: Vec<WorkItem>,
4933    /// The "low priority" work queue for this store's event loop.
4934    low_priority: VecDeque<WorkItem>,
4935    /// A place to stash the reason a fiber is suspending so that the code which
4936    /// resumed it will know under what conditions the fiber should be resumed
4937    /// again.
4938    suspend_reason: Option<SuspendReason>,
4939    /// A cached fiber which is waiting for work to do.
4940    ///
4941    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4942    worker: Option<StoreFiber<'static>>,
4943    /// A place to stash the work item for which we're resuming a worker fiber.
4944    worker_item: Option<WorkerItem>,
4945
4946    /// Reference counts for all component error contexts
4947    ///
4948    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4949    /// (sub)component ref counts as tracked by `error_context_tables`, for
4950    /// example when the host holds one or more references to error contexts.
4951    ///
4952    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4953    /// component-wide representation) of the index into concurrent state for a given
4954    /// stored `ErrorContext`.
4955    ///
4956    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4957    /// as a `TableId<ErrorContextState>`.
4958    global_error_context_ref_counts:
4959        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4960}
4961
4962impl Default for ConcurrentState {
4963    fn default() -> Self {
4964        Self {
4965            current_thread: CurrentThread::None,
4966            table: AlwaysMut::new(ResourceTable::new()),
4967            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4968            high_priority: Vec::new(),
4969            low_priority: VecDeque::new(),
4970            suspend_reason: None,
4971            worker: None,
4972            worker_item: None,
4973            global_error_context_ref_counts: BTreeMap::new(),
4974        }
4975    }
4976}
4977
4978impl ConcurrentState {
4979    /// Take ownership of any fibers and futures owned by this object.
4980    ///
4981    /// This should be used when disposing of the `Store` containing this object
4982    /// in order to gracefully resolve any and all fibers using
4983    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4984    /// use-after-free bugs due to fibers which may still have access to the
4985    /// `Store`.
4986    ///
4987    /// Additionally, the futures collected with this function should be dropped
4988    /// within a `tls::set` call, which will ensure than any futures closing
4989    /// over an `&Accessor` will have access to the store when dropped, allowing
4990    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4991    /// panicking.
4992    ///
4993    /// Note that this will leave the object in an inconsistent and unusable
4994    /// state, so it should only be used just prior to dropping it.
4995    pub(crate) fn take_fibers_and_futures(
4996        &mut self,
4997        fibers: &mut Vec<StoreFiber<'static>>,
4998        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4999    ) {
5000        for entry in self.table.get_mut().iter_mut() {
5001            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5002                for mode in mem::take(&mut set.waiting).into_values() {
5003                    if let WaitMode::Fiber(fiber) = mode {
5004                        fibers.push(fiber);
5005                    }
5006                }
5007            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5008                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
5009                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5010                {
5011                    fibers.push(fiber);
5012                }
5013            }
5014        }
5015
5016        if let Some(fiber) = self.worker.take() {
5017            fibers.push(fiber);
5018        }
5019
5020        let mut handle_item = |item| match item {
5021            WorkItem::ResumeFiber(fiber) => {
5022                fibers.push(fiber);
5023            }
5024            WorkItem::PushFuture(future) => {
5025                self.futures
5026                    .get_mut()
5027                    .as_mut()
5028                    .unwrap()
5029                    .push(future.into_inner());
5030            }
5031            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5032            }
5033        };
5034
5035        for item in mem::take(&mut self.high_priority) {
5036            handle_item(item);
5037        }
5038        for item in mem::take(&mut self.low_priority) {
5039            handle_item(item);
5040        }
5041
5042        if let Some(them) = self.futures.get_mut().take() {
5043            futures.push(them);
5044        }
5045    }
5046
5047    fn push<V: Send + Sync + 'static>(
5048        &mut self,
5049        value: V,
5050    ) -> Result<TableId<V>, ResourceTableError> {
5051        self.table.get_mut().push(value).map(TableId::from)
5052    }
5053
5054    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5055        self.table.get_mut().get_mut(&Resource::from(id))
5056    }
5057
5058    pub fn add_child<T: 'static, U: 'static>(
5059        &mut self,
5060        child: TableId<T>,
5061        parent: TableId<U>,
5062    ) -> Result<(), ResourceTableError> {
5063        self.table
5064            .get_mut()
5065            .add_child(Resource::from(child), Resource::from(parent))
5066    }
5067
5068    pub fn remove_child<T: 'static, U: 'static>(
5069        &mut self,
5070        child: TableId<T>,
5071        parent: TableId<U>,
5072    ) -> Result<(), ResourceTableError> {
5073        self.table
5074            .get_mut()
5075            .remove_child(Resource::from(child), Resource::from(parent))
5076    }
5077
5078    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5079        self.table.get_mut().delete(Resource::from(id))
5080    }
5081
5082    fn push_future(&mut self, future: HostTaskFuture) {
5083        // Note that we can't directly push to `ConcurrentState::futures` here
5084        // since this may be called from a future that's being polled inside
5085        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5086        // so it has exclusive access while polling it.  Therefore, we push a
5087        // work item to the "high priority" queue, which will actually push to
5088        // `ConcurrentState::futures` later.
5089        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5090    }
5091
5092    fn push_high_priority(&mut self, item: WorkItem) {
5093        log::trace!("push high priority: {item:?}");
5094        self.high_priority.push(item);
5095    }
5096
5097    fn push_low_priority(&mut self, item: WorkItem) {
5098        log::trace!("push low priority: {item:?}");
5099        self.low_priority.push_front(item);
5100    }
5101
5102    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5103        if high_priority {
5104            self.push_high_priority(item);
5105        } else {
5106            self.push_low_priority(item);
5107        }
5108    }
5109
5110    fn promote_instance_local_thread_work_item(
5111        &mut self,
5112        current_instance: RuntimeComponentInstanceIndex,
5113    ) -> bool {
5114        self.promote_work_items_matching(|item: &WorkItem| match item {
5115            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5116                *instance == current_instance
5117            }
5118            _ => false,
5119        })
5120    }
5121
5122    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5123        self.promote_work_items_matching(|item: &WorkItem| match item {
5124            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5125                *t == thread
5126            }
5127            _ => false,
5128        })
5129    }
5130
5131    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5132    where
5133        F: FnMut(&WorkItem) -> bool,
5134    {
5135        // If there's a high-priority work item to resume the current guest thread,
5136        // we don't need to promote anything, but we return true to indicate that
5137        // work is pending for the current instance.
5138        if self.high_priority.iter().any(&mut predicate) {
5139            true
5140        }
5141        // Otherwise, look for a low-priority work item that matches the current
5142        // instance and promote it to high-priority.
5143        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5144            let item = self.low_priority.remove(idx).unwrap();
5145            self.push_high_priority(item);
5146            true
5147        } else {
5148            false
5149        }
5150    }
5151
5152    /// Returns whether there's a pending cancellation on the current guest thread,
5153    /// consuming the event if so.
5154    fn take_pending_cancellation(&mut self) -> Result<bool> {
5155        let thread = self.current_guest_thread()?;
5156        if let Some(event) = self.get_mut(thread.task)?.event.take() {
5157            assert!(matches!(event, Event::Cancelled));
5158            Ok(true)
5159        } else {
5160            Ok(false)
5161        }
5162    }
5163
5164    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5165        if self.may_block(task)? {
5166            Ok(())
5167        } else {
5168            Err(Trap::CannotBlockSyncTask.into())
5169        }
5170    }
5171
5172    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5173        let task = self.get_mut(task)?;
5174        Ok(task.async_function || task.returned_or_cancelled())
5175    }
5176
5177    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5178    /// specified task.
5179    ///
5180    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5181    /// below.
5182    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5183        let (task, is_host) = (task >> 1, task & 1 == 1);
5184        if is_host {
5185            let task: TableId<HostTask> = TableId::new(task);
5186            Ok(&mut self.get_mut(task)?.call_context)
5187        } else {
5188            let task: TableId<GuestTask> = TableId::new(task);
5189            Ok(&mut self.get_mut(task)?.call_context)
5190        }
5191    }
5192
5193    /// Used by `ResourceTables` to record the scope of a borrow to get undone
5194    /// in the future.
5195    pub fn current_call_context_scope_id(&self) -> Result<u32> {
5196        let (bits, is_host) = match self.current_thread {
5197            CurrentThread::Guest(id) => (id.task.rep(), false),
5198            CurrentThread::Host(id) => (id.rep(), true),
5199            CurrentThread::None => bail_bug!("current thread is not set"),
5200        };
5201        assert_eq!((bits << 1) >> 1, bits);
5202        Ok((bits << 1) | u32::from(is_host))
5203    }
5204
5205    fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5206        match self.current_thread.guest() {
5207            Some(id) => Ok(*id),
5208            None => bail_bug!("current thread is not a guest thread"),
5209        }
5210    }
5211
5212    fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5213        match self.current_thread.host() {
5214            Some(id) => Ok(id),
5215            None => bail_bug!("current thread is not a host thread"),
5216        }
5217    }
5218
5219    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5220        match self.futures.get_mut().as_mut() {
5221            Some(f) => Ok(f),
5222            None => bail_bug!("futures field of concurrent state is currently taken"),
5223        }
5224    }
5225
5226    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5227        self.table.get_mut()
5228    }
5229}
5230
5231/// Provide a type hint to compiler about the shape of a parameter lower
5232/// closure.
5233fn for_any_lower<
5234    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5235>(
5236    fun: F,
5237) -> F {
5238    fun
5239}
5240
5241/// Provide a type hint to compiler about the shape of a result lift closure.
5242fn for_any_lift<
5243    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5244>(
5245    fun: F,
5246) -> F {
5247    fun
5248}
5249
5250/// Wrap the specified future in a `poll_fn` which asserts that the future is
5251/// only polled from the event loop of the specified `Store`.
5252///
5253/// See `StoreContextMut::run_concurrent` for details.
5254fn checked<F: Future + Send + 'static>(
5255    id: StoreId,
5256    fut: F,
5257) -> impl Future<Output = F::Output> + Send + 'static {
5258    async move {
5259        let mut fut = pin!(fut);
5260        future::poll_fn(move |cx| {
5261            let message = "\
5262                `Future`s which depend on asynchronous component tasks, streams, or \
5263                futures to complete may only be polled from the event loop of the \
5264                store to which they belong.  Please use \
5265                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5266            ";
5267            tls::try_get(|store| {
5268                let matched = match store {
5269                    tls::TryGet::Some(store) => store.id() == id,
5270                    tls::TryGet::Taken | tls::TryGet::None => false,
5271                };
5272
5273                if !matched {
5274                    panic!("{message}")
5275                }
5276            });
5277            fut.as_mut().poll(cx)
5278        })
5279        .await
5280    }
5281}
5282
5283/// Assert that `StoreContextMut::run_concurrent` has not been called from
5284/// within an store's event loop.
5285fn check_recursive_run() {
5286    tls::try_get(|store| {
5287        if !matches!(store, tls::TryGet::None) {
5288            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5289        }
5290    });
5291}
5292
5293fn unpack_callback_code(code: u32) -> (u32, u32) {
5294    (code & 0xF, code >> 4)
5295}
5296
5297/// Helper struct for packaging parameters to be passed to
5298/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5299/// `waitable-set.poll`.
5300struct WaitableCheckParams {
5301    set: TableId<WaitableSet>,
5302    options: OptionsIndex,
5303    payload: u32,
5304}
5305
5306/// Indicates whether `ComponentInstance::waitable_check` is being called for
5307/// `waitable-set.wait` or `waitable-set.poll`.
5308enum WaitableCheck {
5309    Wait,
5310    Poll,
5311}
5312
5313/// Represents a guest task called from the host, prepared using `prepare_call`.
5314pub(crate) struct PreparedCall<R> {
5315    /// The guest export to be called
5316    handle: Func,
5317    /// The guest thread created by `prepare_call`
5318    thread: QualifiedThreadId,
5319    /// The number of lowered core Wasm parameters to pass to the call.
5320    param_count: usize,
5321    /// The `oneshot::Receiver` to which the result of the call will be
5322    /// delivered when it is available.
5323    rx: oneshot::Receiver<LiftedResult>,
5324    _phantom: PhantomData<R>,
5325}
5326
5327impl<R> PreparedCall<R> {
5328    /// Get a copy of the `TaskId` for this `PreparedCall`.
5329    pub(crate) fn task_id(&self) -> TaskId {
5330        TaskId {
5331            task: self.thread.task,
5332        }
5333    }
5334}
5335
5336/// Represents a task created by `prepare_call`.
5337pub(crate) struct TaskId {
5338    task: TableId<GuestTask>,
5339}
5340
5341impl TaskId {
5342    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5343    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5344    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5345    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5346    /// and delete the task when all threads are done.
5347    pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5348        let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5349        let delete = if !task.already_lowered_parameters() {
5350            true
5351        } else {
5352            task.host_future_state = HostFutureState::Dropped;
5353            task.ready_to_delete()
5354        };
5355        if delete {
5356            Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5357        }
5358        Ok(())
5359    }
5360}
5361
5362/// Prepare a call to the specified exported Wasm function, providing functions
5363/// for lowering the parameters and lifting the result.
5364///
5365/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5366/// loop, use `queue_call`.
5367pub(crate) fn prepare_call<T, R>(
5368    mut store: StoreContextMut<T>,
5369    handle: Func,
5370    param_count: usize,
5371    host_future_present: bool,
5372    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5373    + Send
5374    + Sync
5375    + 'static,
5376    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5377    + Send
5378    + Sync
5379    + 'static,
5380) -> Result<PreparedCall<R>> {
5381    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5382
5383    let instance = handle.instance().id().get(store.0);
5384    let options = &instance.component().env_component().options[options];
5385    let ty = &instance.component().types()[ty];
5386    let async_function = ty.async_;
5387    let task_return_type = ty.results;
5388    let component_instance = raw_options.instance;
5389    let callback = options.callback.map(|i| instance.runtime_callback(i));
5390    let memory = options
5391        .memory()
5392        .map(|i| instance.runtime_memory(i))
5393        .map(SendSyncPtr::new);
5394    let string_encoding = options.string_encoding;
5395    let token = StoreToken::new(store.as_context_mut());
5396    let state = store.0.concurrent_state_mut();
5397
5398    let (tx, rx) = oneshot::channel();
5399
5400    let instance = RuntimeInstance {
5401        instance: handle.instance().id().instance(),
5402        index: component_instance,
5403    };
5404    let caller = state.current_thread;
5405    let task = GuestTask::new(
5406        Box::new(for_any_lower(move |store, params| {
5407            lower_params(handle, token.as_context_mut(store), params)
5408        })),
5409        LiftResult {
5410            lift: Box::new(for_any_lift(move |store, result| {
5411                lift_result(handle, store, result)
5412            })),
5413            ty: task_return_type,
5414            memory,
5415            string_encoding,
5416        },
5417        Caller::Host {
5418            tx: Some(tx),
5419            host_future_present,
5420            caller,
5421        },
5422        callback.map(|callback| {
5423            let callback = SendSyncPtr::new(callback);
5424            let instance = handle.instance();
5425            Box::new(move |store: &mut dyn VMStore, event, handle| {
5426                let store = token.as_context_mut(store);
5427                // SAFETY: Per the contract of `prepare_call`, the callback
5428                // will remain valid at least as long is this task exists.
5429                unsafe { instance.call_callback(store, callback, event, handle) }
5430            }) as CallbackFn
5431        }),
5432        instance,
5433        async_function,
5434    )?;
5435
5436    let task = state.push(task)?;
5437    let new_thread = GuestThread::new_implicit(state, task)?;
5438    let thread = state.push(new_thread)?;
5439    state.get_mut(task)?.threads.insert(thread);
5440
5441    if !store.0.may_enter(instance)? {
5442        bail!(Trap::CannotEnterComponent);
5443    }
5444
5445    Ok(PreparedCall {
5446        handle,
5447        thread: QualifiedThreadId { task, thread },
5448        param_count,
5449        rx,
5450        _phantom: PhantomData,
5451    })
5452}
5453
5454/// Queue a call previously prepared using `prepare_call` to be run as part of
5455/// the associated `ComponentInstance`'s event loop.
5456///
5457/// The returned future will resolve to the result once it is available, but
5458/// must only be polled via the instance's event loop. See
5459/// `StoreContextMut::run_concurrent` for details.
5460pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5461    mut store: StoreContextMut<T>,
5462    prepared: PreparedCall<R>,
5463) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5464    let PreparedCall {
5465        handle,
5466        thread,
5467        param_count,
5468        rx,
5469        ..
5470    } = prepared;
5471
5472    queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5473
5474    Ok(checked(
5475        store.0.id(),
5476        rx.map(move |result| match result {
5477            Ok(r) => match r.downcast() {
5478                Ok(r) => Ok(*r),
5479                Err(_) => bail_bug!("wrong type of value produced"),
5480            },
5481            Err(e) => Err(e.into()),
5482        }),
5483    ))
5484}
5485
5486/// Queue a call previously prepared using `prepare_call` to be run as part of
5487/// the associated `ComponentInstance`'s event loop.
5488fn queue_call0<T: 'static>(
5489    store: StoreContextMut<T>,
5490    handle: Func,
5491    guest_thread: QualifiedThreadId,
5492    param_count: usize,
5493) -> Result<()> {
5494    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5495    let is_concurrent = raw_options.async_;
5496    let callback = raw_options.callback;
5497    let instance = handle.instance();
5498    let callee = handle.lifted_core_func(store.0);
5499    let post_return = handle.post_return_core_func(store.0);
5500    let callback = callback.map(|i| {
5501        let instance = instance.id().get(store.0);
5502        SendSyncPtr::new(instance.runtime_callback(i))
5503    });
5504
5505    log::trace!("queueing call {guest_thread:?}");
5506
5507    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5508    // (with signatures appropriate for this call) and will remain valid as
5509    // long as this instance is valid.
5510    unsafe {
5511        instance.queue_call(
5512            store,
5513            guest_thread,
5514            SendSyncPtr::new(callee),
5515            param_count,
5516            1,
5517            is_concurrent,
5518            callback,
5519            post_return.map(SendSyncPtr::new),
5520        )
5521    }
5522}