wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Async.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use crate::component::func::{self, Func, Options};
54use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55use crate::fiber::{self, StoreFiber, StoreFiberYield};
56use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57use crate::vm::component::{
58    CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59};
60use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
61use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
62use anyhow::{Context as _, Result, anyhow, bail};
63use error_contexts::GlobalErrorContextRefCount;
64use futures::channel::oneshot;
65use futures::future::{self, Either, FutureExt};
66use futures::stream::{FuturesUnordered, StreamExt};
67use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
68use std::any::Any;
69use std::borrow::ToOwned;
70use std::boxed::Box;
71use std::cell::UnsafeCell;
72use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
73use std::fmt;
74use std::future::Future;
75use std::marker::PhantomData;
76use std::mem::{self, ManuallyDrop, MaybeUninit};
77use std::ops::DerefMut;
78use std::pin::{Pin, pin};
79use std::ptr::{self, NonNull};
80use std::slice;
81use std::sync::Arc;
82use std::task::{Context, Poll, Waker};
83use std::vec::Vec;
84use table::{TableDebug, TableId};
85use wasmtime_environ::component::{
86    CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
87    OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
88    RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
89    TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
90    TypeTupleIndex,
91};
92
93pub use abort::JoinHandle;
94pub use futures_and_streams::{
95    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
98};
99pub(crate) use futures_and_streams::{
100    ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
101};
102
103mod abort;
104mod error_contexts;
105mod futures_and_streams;
106mod table;
107pub(crate) mod tls;
108
109/// Constant defined in the Component Model spec to indicate that the async
110/// intrinsic (e.g. `future.write`) has not yet completed.
111const BLOCKED: u32 = 0xffff_ffff;
112
113/// Corresponds to `CallState` in the upstream spec.
114#[derive(Clone, Copy, Eq, PartialEq, Debug)]
115pub enum Status {
116    Starting = 0,
117    Started = 1,
118    Returned = 2,
119    StartCancelled = 3,
120    ReturnCancelled = 4,
121}
122
123impl Status {
124    /// Packs this status and the optional `waitable` provided into a 32-bit
125    /// result that the canonical ABI requires.
126    ///
127    /// The low 4 bits are reserved for the status while the upper 28 bits are
128    /// the waitable, if present.
129    pub fn pack(self, waitable: Option<u32>) -> u32 {
130        assert!(matches!(self, Status::Returned) == waitable.is_none());
131        let waitable = waitable.unwrap_or(0);
132        assert!(waitable < (1 << 28));
133        (waitable << 4) | (self as u32)
134    }
135}
136
137/// Corresponds to `EventCode` in the Component Model spec, plus related payload
138/// data.
139#[derive(Clone, Copy, Debug)]
140enum Event {
141    None,
142    Cancelled,
143    Subtask {
144        status: Status,
145    },
146    StreamRead {
147        code: ReturnCode,
148        pending: Option<(TypeStreamTableIndex, u32)>,
149    },
150    StreamWrite {
151        code: ReturnCode,
152        pending: Option<(TypeStreamTableIndex, u32)>,
153    },
154    FutureRead {
155        code: ReturnCode,
156        pending: Option<(TypeFutureTableIndex, u32)>,
157    },
158    FutureWrite {
159        code: ReturnCode,
160        pending: Option<(TypeFutureTableIndex, u32)>,
161    },
162}
163
164impl Event {
165    /// Lower this event to core Wasm integers for delivery to the guest.
166    ///
167    /// Note that the waitable handle, if any, is assumed to be lowered
168    /// separately.
169    fn parts(self) -> (u32, u32) {
170        const EVENT_NONE: u32 = 0;
171        const EVENT_SUBTASK: u32 = 1;
172        const EVENT_STREAM_READ: u32 = 2;
173        const EVENT_STREAM_WRITE: u32 = 3;
174        const EVENT_FUTURE_READ: u32 = 4;
175        const EVENT_FUTURE_WRITE: u32 = 5;
176        const EVENT_CANCELLED: u32 = 6;
177        match self {
178            Event::None => (EVENT_NONE, 0),
179            Event::Cancelled => (EVENT_CANCELLED, 0),
180            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
181            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
182            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
183            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
184            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
185        }
186    }
187}
188
189/// Corresponds to `CallbackCode` in the spec.
190mod callback_code {
191    pub const EXIT: u32 = 0;
192    pub const YIELD: u32 = 1;
193    pub const WAIT: u32 = 2;
194    pub const POLL: u32 = 3;
195}
196
197/// A flag indicating that the callee is an async-lowered export.
198///
199/// This may be passed to the `async-start` intrinsic from a fused adapter.
200const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
201
202/// Provides access to either store data (via the `get` method) or the store
203/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
204/// instance to which the current host task belongs.
205///
206/// See [`Accessor::with`] for details.
207pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
208    store: StoreContextMut<'a, T>,
209    get_data: fn(&mut T) -> D::Data<'_>,
210}
211
212impl<'a, T, D> Access<'a, T, D>
213where
214    D: HasData + ?Sized,
215    T: 'static,
216{
217    /// Creates a new [`Access`] from its component parts.
218    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
219        Self { store, get_data }
220    }
221
222    /// Get mutable access to the store data.
223    pub fn data_mut(&mut self) -> &mut T {
224        self.store.data_mut()
225    }
226
227    /// Get mutable access to the store data.
228    pub fn get(&mut self) -> D::Data<'_> {
229        (self.get_data)(self.data_mut())
230    }
231
232    /// Spawn a background task.
233    ///
234    /// See [`Accessor::spawn`] for details.
235    pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
236    where
237        T: 'static,
238    {
239        let accessor = Accessor {
240            get_data: self.get_data,
241            token: StoreToken::new(self.store.as_context_mut()),
242        };
243        self.store
244            .as_context_mut()
245            .spawn_with_accessor(accessor, task)
246    }
247}
248
249impl<'a, T, D> AsContext for Access<'a, T, D>
250where
251    D: HasData + ?Sized,
252    T: 'static,
253{
254    type Data = T;
255
256    fn as_context(&self) -> StoreContext<'_, T> {
257        self.store.as_context()
258    }
259}
260
261impl<'a, T, D> AsContextMut for Access<'a, T, D>
262where
263    D: HasData + ?Sized,
264    T: 'static,
265{
266    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
267        self.store.as_context_mut()
268    }
269}
270
271/// Provides scoped mutable access to store data in the context of a concurrent
272/// host task future.
273///
274/// This allows multiple host task futures to execute concurrently and access
275/// the store between (but not across) `await` points.
276///
277/// # Rationale
278///
279/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
280/// `D::Data<'_>`. The problem this is solving, however, is that it does not
281/// literally store these values. The basic problem is that when a concurrent
282/// host future is being polled it has access to `&mut T` (and the whole
283/// `Store`) but when it's not being polled it does not have access to these
284/// values. This reflects how the store is only ever polling one future at a
285/// time so the store is effectively being passed between futures.
286///
287/// Rust's `Future` trait, however, has no means of passing a `Store`
288/// temporarily between futures. The [`Context`](std::task::Context) type does
289/// not have the ability to attach arbitrary information to it at this time.
290/// This type, [`Accessor`], is used to bridge this expressivity gap.
291///
292/// The [`Accessor`] type here represents the ability to acquire, temporarily in
293/// a synchronous manner, the current store. The [`Accessor::with`] function
294/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
295/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
296/// not take an `async` closure as its argument, instead it's a synchronous
297/// closure which must complete during on run of `Future::poll`. This reflects
298/// how the store is temporarily made available while a host future is being
299/// polled.
300///
301/// # Implementation
302///
303/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
304/// this type additionally doesn't even have a lifetime parameter. This is
305/// instead a representation of proof of the ability to acquire these while a
306/// future is being polled. Wasmtime will, when it polls a host future,
307/// configure ambient state such that the `Accessor` that a future closes over
308/// will work and be able to access the store.
309///
310/// This has a number of implications for users such as:
311///
312/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
313///   the lifetime of a single future.
314/// * A future is expected to, however, close over an `Accessor` and keep it
315///   alive probably for the duration of the entire future.
316/// * Different host futures will be given different `Accessor`s, and that's
317///   intentional.
318/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
319///   alleviates some otherwise required bounds to be written down.
320///
321/// # Using `Accessor` in `Drop`
322///
323/// The methods on `Accessor` are only expected to work in the context of
324/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
325/// host future can be dropped at any time throughout the system and Wasmtime
326/// store context is not necessarily available at that time. It's recommended to
327/// not use `Accessor` methods in anything connected to a `Drop` implementation
328/// as they will panic and have unintended results. If you run into this though
329/// feel free to file an issue on the Wasmtime repository.
330pub struct Accessor<T: 'static, D = HasSelf<T>>
331where
332    D: HasData + ?Sized,
333{
334    token: StoreToken<T>,
335    get_data: fn(&mut T) -> D::Data<'_>,
336}
337
338/// A helper trait to take any type of accessor-with-data in functions.
339///
340/// This trait is similar to [`AsContextMut`] except that it's used when
341/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
342/// [`Accessor`] is the main type used in concurrent settings and is passed to
343/// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
344///
345/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
346/// this trait. This effectively means that regardless of the `D` in
347/// `Accessor<T, D>` it can still be passed to a function which just needs a
348/// store accessor.
349///
350/// Acquiring an [`Accessor`] can be done through
351/// [`StoreContextMut::run_concurrent`] for example or in a host function
352/// through
353/// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
354pub trait AsAccessor {
355    /// The `T` in `Store<T>` that this accessor refers to.
356    type Data: 'static;
357
358    /// The `D` in `Accessor<T, D>`, or the projection out of
359    /// `Self::Data`.
360    type AccessorData: HasData + ?Sized;
361
362    /// Returns the accessor that this is referring to.
363    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
364}
365
366impl<T: AsAccessor + ?Sized> AsAccessor for &T {
367    type Data = T::Data;
368    type AccessorData = T::AccessorData;
369
370    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
371        T::as_accessor(self)
372    }
373}
374
375impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
376    type Data = T;
377    type AccessorData = D;
378
379    fn as_accessor(&self) -> &Accessor<T, D> {
380        self
381    }
382}
383
384// Note that it is intentional at this time that `Accessor` does not actually
385// store `&mut T` or anything similar. This distinctly enables the `Accessor`
386// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
387// that matter). This is used to ergonomically simplify bindings where the
388// majority of the time `Accessor` is closed over in a future which then needs
389// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
390// you already have to write `T: 'static`...) it helps to avoid this.
391//
392// Note as well that `Accessor` doesn't actually store its data at all. Instead
393// it's more of a "proof" of what can be accessed from TLS. API design around
394// `Accessor` and functions like `Linker::func_wrap_concurrent` are
395// intentionally made to ensure that `Accessor` is ideally only used in the
396// context that TLS variables are actually set. For example host functions are
397// given `&Accessor`, not `Accessor`, and this prevents them from persisting
398// the value outside of a future. Within the future the TLS variables are all
399// guaranteed to be set while the future is being polled.
400//
401// Finally though this is not an ironclad guarantee, but nor does it need to be.
402// The TLS APIs are designed to panic or otherwise model usage where they're
403// called recursively or similar. It's hoped that code cannot be constructed to
404// actually hit this at runtime but this is not a safety requirement at this
405// time.
406const _: () = {
407    const fn assert<T: Send + Sync>() {}
408    assert::<Accessor<UnsafeCell<u32>>>();
409};
410
411impl<T> Accessor<T> {
412    /// Creates a new `Accessor` backed by the specified functions.
413    ///
414    /// - `get`: used to retrieve the store
415    ///
416    /// - `get_data`: used to "project" from the store's associated data to
417    /// another type (e.g. a field of that data or a wrapper around it).
418    ///
419    /// - `spawn`: used to queue spawned background tasks to be run later
420    pub(crate) fn new(token: StoreToken<T>) -> Self {
421        Self {
422            token,
423            get_data: |x| x,
424        }
425    }
426}
427
428impl<T, D> Accessor<T, D>
429where
430    D: HasData + ?Sized,
431{
432    /// Run the specified closure, passing it mutable access to the store.
433    ///
434    /// This function is one of the main building blocks of the [`Accessor`]
435    /// type. This yields synchronous, blocking, access to store via an
436    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
437    /// providing the ability to access `D` via [`Access::get`]. Note that the
438    /// `fun` here is given only temporary access to the store and `T`/`D`
439    /// meaning that the return value `R` here is not allowed to capture borrows
440    /// into the two. If access is needed to data within `T` or `D` outside of
441    /// this closure then it must be `clone`d out, for example.
442    ///
443    /// # Panics
444    ///
445    /// This function will panic if it is call recursively with any other
446    /// accessor already in scope. For example if `with` is called within `fun`,
447    /// then this function will panic. It is up to the embedder to ensure that
448    /// this does not happen.
449    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
450        tls::get(|vmstore| {
451            fun(Access {
452                store: self.token.as_context_mut(vmstore),
453                get_data: self.get_data,
454            })
455        })
456    }
457
458    /// Returns the getter this accessor is using to project from `T` into
459    /// `D::Data`.
460    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
461        self.get_data
462    }
463
464    /// Changes this accessor to access `D2` instead of the current type
465    /// parameter `D`.
466    ///
467    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
468    ///
469    /// # Panics
470    ///
471    /// When using this API the returned value is disconnected from `&self` and
472    /// the lifetime binding the `self` argument. An `Accessor` only works
473    /// within the context of the closure or async closure that it was
474    /// originally given to, however. This means that due to the fact that the
475    /// returned value has no lifetime connection it's possible to use the
476    /// accessor outside of `&self`, the original accessor, and panic.
477    ///
478    /// The returned value should only be used within the scope of the original
479    /// `Accessor` that `self` refers to.
480    pub fn with_getter<D2: HasData>(
481        &self,
482        get_data: fn(&mut T) -> D2::Data<'_>,
483    ) -> Accessor<T, D2> {
484        Accessor {
485            token: self.token,
486            get_data,
487        }
488    }
489
490    /// Spawn a background task which will receive an `&Accessor<T, D>` and
491    /// run concurrently with any other tasks in progress for the current
492    /// store.
493    ///
494    /// This is particularly useful for host functions which return a `stream`
495    /// or `future` such that the code to write to the write end of that
496    /// `stream` or `future` must run after the function returns.
497    ///
498    /// The returned [`JoinHandle`] may be used to cancel the task.
499    ///
500    /// # Panics
501    ///
502    /// Panics if called within a closure provided to the [`Accessor::with`]
503    /// function. This can only be called outside an active invocation of
504    /// [`Accessor::with`].
505    pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
506    where
507        T: 'static,
508    {
509        let accessor = self.clone_for_spawn();
510        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
511    }
512
513    fn clone_for_spawn(&self) -> Self {
514        Self {
515            token: self.token,
516            get_data: self.get_data,
517        }
518    }
519}
520
521/// Represents a task which may be provided to `Accessor::spawn`,
522/// `Accessor::forward`, or `StorecContextMut::spawn`.
523// TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
524// option.
525//
526// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
527// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
528// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
529// Send + Sync + 'static` fails with a type mismatch error when we try to pass
530// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
531// best we can do for the time being.
532pub trait AccessorTask<T, D, R>: Send + 'static
533where
534    D: HasData + ?Sized,
535{
536    /// Run the task.
537    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
538}
539
540/// Represents parameter and result metadata for the caller side of a
541/// guest->guest call orchestrated by a fused adapter.
542enum CallerInfo {
543    /// Metadata for a call to an async-lowered import
544    Async {
545        params: Vec<ValRaw>,
546        has_result: bool,
547    },
548    /// Metadata for a call to an sync-lowered import
549    Sync {
550        params: Vec<ValRaw>,
551        result_count: u32,
552    },
553}
554
555/// Indicates how a guest task is waiting on a waitable set.
556enum WaitMode {
557    /// The guest task is waiting using `task.wait`
558    Fiber(StoreFiber<'static>),
559    /// The guest task is waiting via a callback declared as part of an
560    /// async-lifted export.
561    Callback(Instance),
562}
563
564/// Represents the reason a fiber is suspending itself.
565#[derive(Debug)]
566enum SuspendReason {
567    /// The fiber is waiting for an event to be delivered to the specified
568    /// waitable set or task.
569    Waiting {
570        set: TableId<WaitableSet>,
571        task: TableId<GuestTask>,
572    },
573    /// The fiber has finished handling its most recent work item and is waiting
574    /// for another (or to be dropped if it is no longer needed).
575    NeedWork,
576    /// The fiber is yielding and should be resumed once other tasks have had a
577    /// chance to run.
578    Yielding { task: TableId<GuestTask> },
579}
580
581/// Represents a pending call into guest code for a given guest task.
582enum GuestCallKind {
583    /// Indicates there's an event to deliver to the task, possibly related to a
584    /// waitable set the task has been waiting on or polling.
585    DeliverEvent {
586        /// The instance to which the task belongs.
587        instance: Instance,
588        /// The waitable set the event belongs to, if any.
589        ///
590        /// If this is `None` the event will be waiting in the
591        /// `GuestTask::event` field for the task.
592        set: Option<TableId<WaitableSet>>,
593    },
594    /// Indicates that a new guest task call is pending and may be executed
595    /// using the specified closure.
596    Start(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
597}
598
599impl fmt::Debug for GuestCallKind {
600    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
601        match self {
602            Self::DeliverEvent { instance, set } => f
603                .debug_struct("DeliverEvent")
604                .field("instance", instance)
605                .field("set", set)
606                .finish(),
607            Self::Start(_) => f.debug_tuple("Start").finish(),
608        }
609    }
610}
611
612/// Represents a pending call into guest code for a given guest task.
613#[derive(Debug)]
614struct GuestCall {
615    task: TableId<GuestTask>,
616    kind: GuestCallKind,
617}
618
619impl GuestCall {
620    /// Returns whether or not the call is ready to run.
621    ///
622    /// A call will not be ready to run if either:
623    ///
624    /// - the (sub-)component instance to be called has already been entered and
625    /// cannot be reentered until an in-progress call completes
626    ///
627    /// - the call is for a not-yet started task and the (sub-)component
628    /// instance to be called has backpressure enabled
629    fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
630        let task_instance = state.get_mut(self.task)?.instance;
631        let state = state.instance_state(task_instance);
632        let ready = match &self.kind {
633            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
634            GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0),
635        };
636        log::trace!(
637            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
638            state.do_not_enter,
639            state.backpressure
640        );
641        Ok(ready)
642    }
643}
644
645/// Job to be run on a worker fiber.
646enum WorkerItem {
647    GuestCall(GuestCall),
648    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
649}
650
651/// Represents state related to an in-progress poll operation (e.g. `task.poll`
652/// or `CallbackCode.POLL`).
653#[derive(Debug)]
654struct PollParams {
655    /// The instance to which the polling task belongs.
656    instance: Instance,
657    /// The polling task.
658    task: TableId<GuestTask>,
659    /// The waitable set being polled.
660    set: TableId<WaitableSet>,
661}
662
663/// Represents a pending work item to be handled by the event loop for a given
664/// component instance.
665enum WorkItem {
666    /// A host task to be pushed to `ConcurrentState::futures`.
667    PushFuture(AlwaysMut<HostTaskFuture>),
668    /// A fiber to resume.
669    ResumeFiber(StoreFiber<'static>),
670    /// A pending call into guest code for a given guest task.
671    GuestCall(GuestCall),
672    /// A pending `task.poll` or `CallbackCode.POLL` operation.
673    Poll(PollParams),
674    /// A job to run on a worker fiber.
675    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
676}
677
678impl fmt::Debug for WorkItem {
679    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
680        match self {
681            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
682            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
683            Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
684            Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
685            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
686        }
687    }
688}
689
690/// Poll the specified future until it completes on behalf of a guest->host call
691/// using a sync-lowered import.
692///
693/// This is similar to `Instance::first_poll` except it's for sync-lowered
694/// imports, meaning we don't need to handle cancellation and we can block the
695/// caller until the task completes, at which point the caller can handle
696/// lowering the result to the guest's stack and linear memory.
697pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
698    store: &mut dyn VMStore,
699    future: impl Future<Output = Result<R>> + Send + 'static,
700    caller_instance: RuntimeComponentInstanceIndex,
701) -> Result<R> {
702    let state = store.concurrent_state_mut();
703
704    // If there is no current guest task set, that means the host function was
705    // registered using e.g. `LinkerInstance::func_wrap`, in which case it
706    // should complete immediately.
707    let Some(caller) = state.guest_task else {
708        return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
709            Poll::Ready(result) => result,
710            Poll::Pending => {
711                unreachable!()
712            }
713        };
714    };
715
716    // Save any existing result stashed in `GuestTask::result` so we can replace
717    // it with the new result.
718    let old_result = state
719        .get_mut(caller)
720        .with_context(|| format!("bad handle: {caller:?}"))?
721        .result
722        .take();
723
724    // Add a temporary host task into the table so we can track its progress.
725    // Note that we'll never allocate a waitable handle for the guest since
726    // we're being called synchronously.
727    let task = state.push(HostTask::new(caller_instance, None))?;
728
729    log::trace!("new host task child of {caller:?}: {task:?}");
730
731    // Wrap the future in a closure which will take care of stashing the result
732    // in `GuestTask::result` and resuming this fiber when the host task
733    // completes.
734    let mut future = Box::pin(async move {
735        let result = future.await?;
736        tls::get(move |store| {
737            let state = store.concurrent_state_mut();
738            state.get_mut(caller)?.result = Some(Box::new(result) as _);
739
740            Waitable::Host(task).set_event(
741                state,
742                Some(Event::Subtask {
743                    status: Status::Returned,
744                }),
745            )?;
746
747            Ok(())
748        })
749    }) as HostTaskFuture;
750
751    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
752    // add the future to `ConcurrentState::futures` and poll it automatically
753    // from the event loop if it doesn't complete immediately here.
754    let poll = tls::set(store, || {
755        future
756            .as_mut()
757            .poll(&mut Context::from_waker(&Waker::noop()))
758    });
759
760    match poll {
761        Poll::Ready(result) => {
762            // It completed immediately; check the result and delete the task.
763            result?;
764            log::trace!("delete host task {task:?} (already ready)");
765            store.concurrent_state_mut().delete(task)?;
766        }
767        Poll::Pending => {
768            // It did not complete immediately; add it to
769            // `ConcurrentState::futures` so it will be polled via the event
770            // loop; then use `GuestTask::sync_call_set` to wait for the task to
771            // complete, suspending the current fiber until it does so.
772            let state = store.concurrent_state_mut();
773            state.push_future(future);
774
775            let set = state.get_mut(caller)?.sync_call_set;
776            Waitable::Host(task).join(state, Some(set))?;
777
778            store.suspend(SuspendReason::Waiting { set, task: caller })?;
779        }
780    }
781
782    // Retrieve and return the result.
783    Ok(*mem::replace(
784        &mut store.concurrent_state_mut().get_mut(caller)?.result,
785        old_result,
786    )
787    .unwrap()
788    .downcast()
789    .unwrap())
790}
791
792/// Execute the specified guest call.
793fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
794    match call.kind {
795        GuestCallKind::DeliverEvent { instance, set } => {
796            let (event, waitable) = instance.get_event(store, call.task, set, true)?.unwrap();
797            let state = store.concurrent_state_mut();
798            let task = state.get_mut(call.task)?;
799            let runtime_instance = task.instance;
800            let handle = waitable.map(|(_, v)| v).unwrap_or(0);
801
802            log::trace!(
803                "use callback to deliver event {event:?} to {:?} for {waitable:?}",
804                call.task,
805            );
806
807            let old_task = state.guest_task.replace(call.task);
808            log::trace!(
809                "GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
810                call.task
811            );
812
813            store.maybe_push_call_context(call.task)?;
814
815            let state = store.concurrent_state_mut();
816            state.enter_instance(runtime_instance);
817
818            let callback = state.get_mut(call.task)?.callback.take().unwrap();
819
820            let code = callback(store, runtime_instance, event, handle)?;
821
822            let state = store.concurrent_state_mut();
823
824            state.get_mut(call.task)?.callback = Some(callback);
825
826            state.exit_instance(runtime_instance)?;
827
828            store.maybe_pop_call_context(call.task)?;
829
830            instance.handle_callback_code(store, call.task, runtime_instance, code, false)?;
831
832            store.concurrent_state_mut().guest_task = old_task;
833            log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
834        }
835        GuestCallKind::Start(fun) => {
836            fun(store)?;
837        }
838    }
839
840    Ok(())
841}
842
843impl<T> Store<T> {
844    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
845    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
846    where
847        T: Send + 'static,
848    {
849        self.as_context_mut().run_concurrent(fun).await
850    }
851
852    #[doc(hidden)]
853    pub fn assert_concurrent_state_empty(&mut self) {
854        self.as_context_mut().assert_concurrent_state_empty();
855    }
856
857    /// Convenience wrapper for [`StoreContextMut::spawn`].
858    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
859    where
860        T: 'static,
861    {
862        self.as_context_mut().spawn(task)
863    }
864}
865
866impl<T> StoreContextMut<'_, T> {
867    /// Assert that all the relevant tables and queues in the concurrent state
868    /// for this store are empty.
869    ///
870    /// This is for sanity checking in integration tests
871    /// (e.g. `component-async-tests`) that the relevant state has been cleared
872    /// after each test concludes.  This should help us catch leaks, e.g. guest
873    /// tasks which haven't been deleted despite having completed and having
874    /// been dropped by their supertasks.
875    #[doc(hidden)]
876    pub fn assert_concurrent_state_empty(self) {
877        let store = self.0;
878        store
879            .store_data_mut()
880            .components
881            .assert_guest_tables_empty();
882        let state = store.concurrent_state_mut();
883        assert!(
884            state.table.get_mut().is_empty(),
885            "non-empty table: {:?}",
886            state.table.get_mut()
887        );
888        assert!(state.high_priority.is_empty());
889        assert!(state.low_priority.is_empty());
890        assert!(state.guest_task.is_none());
891        assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
892        assert!(
893            state
894                .instance_states
895                .iter()
896                .all(|(_, state)| state.pending.is_empty())
897        );
898        assert!(state.global_error_context_ref_counts.is_empty());
899    }
900
901    /// Spawn a background task to run as part of this instance's event loop.
902    ///
903    /// The task will receive an `&Accessor<U>` and run concurrently with
904    /// any other tasks in progress for the instance.
905    ///
906    /// Note that the task will only make progress if and when the event loop
907    /// for this instance is run.
908    ///
909    /// The returned [`SpawnHandle`] may be used to cancel the task.
910    pub fn spawn(mut self, task: impl AccessorTask<T, HasSelf<T>, Result<()>>) -> JoinHandle
911    where
912        T: 'static,
913    {
914        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
915        self.spawn_with_accessor(accessor, task)
916    }
917
918    /// Internal implementation of `spawn` functions where a `store` is
919    /// available along with an `Accessor`.
920    fn spawn_with_accessor<D>(
921        self,
922        accessor: Accessor<T, D>,
923        task: impl AccessorTask<T, D, Result<()>>,
924    ) -> JoinHandle
925    where
926        T: 'static,
927        D: HasData + ?Sized,
928    {
929        // Create an "abortable future" here where internally the future will
930        // hook calls to poll and possibly spawn more background tasks on each
931        // iteration.
932        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
933        self.0
934            .concurrent_state_mut()
935            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
936        handle
937    }
938
939    /// Run the specified closure `fun` to completion as part of this store's
940    /// event loop.
941    ///
942    /// This will run `fun` as part of this store's event loop until it
943    /// yields a result.  `fun` is provided an [`Accessor`], which provides
944    /// controlled access to the store and its data.
945    ///
946    /// This function can be used to invoke [`Func::call_concurrent`] for
947    /// example within the async closure provided here.
948    ///
949    /// # Example
950    ///
951    /// ```
952    /// # use {
953    /// #   anyhow::{Result},
954    /// #   wasmtime::{
955    /// #     component::{ Component, Linker, Resource, ResourceTable},
956    /// #     Config, Engine, Store
957    /// #   },
958    /// # };
959    /// #
960    /// # struct MyResource(u32);
961    /// # struct Ctx { table: ResourceTable }
962    /// #
963    /// # async fn foo() -> Result<()> {
964    /// # let mut config = Config::new();
965    /// # let engine = Engine::new(&config)?;
966    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
967    /// # let mut linker = Linker::new(&engine);
968    /// # let component = Component::new(&engine, "")?;
969    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
970    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
971    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
972    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
973    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
974    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
975    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
976    ///    bar.call_concurrent(accessor, (value.0,)).await?;
977    ///    Ok(())
978    /// }).await??;
979    /// # Ok(())
980    /// # }
981    /// ```
982    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
983    where
984        T: Send + 'static,
985    {
986        self.do_run_concurrent(fun, false).await
987    }
988
989    pub(super) async fn run_concurrent_trap_on_idle<R>(
990        self,
991        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
992    ) -> Result<R>
993    where
994        T: Send + 'static,
995    {
996        self.do_run_concurrent(fun, true).await
997    }
998
999    async fn do_run_concurrent<R>(
1000        mut self,
1001        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1002        trap_on_idle: bool,
1003    ) -> Result<R>
1004    where
1005        T: Send + 'static,
1006    {
1007        check_recursive_run();
1008        let token = StoreToken::new(self.as_context_mut());
1009
1010        struct Dropper<'a, T: 'static, V> {
1011            store: StoreContextMut<'a, T>,
1012            value: ManuallyDrop<V>,
1013        }
1014
1015        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1016            fn drop(&mut self) {
1017                tls::set(self.store.0, || {
1018                    // SAFETY: Here we drop the value without moving it for the
1019                    // first and only time -- per the contract for `Drop::drop`,
1020                    // this code won't run again, and the `value` field will no
1021                    // longer be accessible.
1022                    unsafe { ManuallyDrop::drop(&mut self.value) }
1023                });
1024            }
1025        }
1026
1027        let accessor = &Accessor::new(token);
1028        let dropper = &mut Dropper {
1029            store: self,
1030            value: ManuallyDrop::new(fun(accessor)),
1031        };
1032        // SAFETY: We never move `dropper` nor its `value` field.
1033        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1034
1035        dropper
1036            .store
1037            .as_context_mut()
1038            .poll_until(future, trap_on_idle)
1039            .await
1040    }
1041
1042    /// Run this store's event loop.
1043    ///
1044    /// The returned future will resolve when the specified future completes or,
1045    /// if `trap_on_idle` is true, when the event loop can't make further
1046    /// progress.
1047    async fn poll_until<R>(
1048        mut self,
1049        mut future: Pin<&mut impl Future<Output = R>>,
1050        trap_on_idle: bool,
1051    ) -> Result<R>
1052    where
1053        T: Send + 'static,
1054    {
1055        struct Reset<'a, T: 'static> {
1056            store: StoreContextMut<'a, T>,
1057            futures: Option<FuturesUnordered<HostTaskFuture>>,
1058        }
1059
1060        impl<'a, T> Drop for Reset<'a, T> {
1061            fn drop(&mut self) {
1062                if let Some(futures) = self.futures.take() {
1063                    *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1064                }
1065            }
1066        }
1067
1068        loop {
1069            // Take `ConcurrentState::futures` out of the instance so we can
1070            // poll it while also safely giving any of the futures inside access
1071            // to `self`.
1072            let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1073            let mut reset = Reset {
1074                store: self.as_context_mut(),
1075                futures,
1076            };
1077            let mut next = pin!(reset.futures.as_mut().unwrap().next());
1078
1079            let result = future::poll_fn(|cx| {
1080                // First, poll the future we were passed as an argument and
1081                // return immediately if it's ready.
1082                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1083                    return Poll::Ready(Ok(Either::Left(value)));
1084                }
1085
1086                // Next, poll `ConcurrentState::futures` (which includes any
1087                // pending host tasks and/or background tasks), returning
1088                // immediately if one of them fails.
1089                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1090                    Poll::Ready(Some(output)) => {
1091                        match output {
1092                            Err(e) => return Poll::Ready(Err(e)),
1093                            Ok(()) => {}
1094                        }
1095                        Poll::Ready(true)
1096                    }
1097                    Poll::Ready(None) => Poll::Ready(false),
1098                    Poll::Pending => Poll::Pending,
1099                };
1100
1101                // Next, check the "high priority" work queue and return
1102                // immediately if it has at least one item.
1103                let state = reset.store.0.concurrent_state_mut();
1104                let ready = mem::take(&mut state.high_priority);
1105                let ready = if ready.is_empty() {
1106                    // Next, check the "low priority" work queue and return
1107                    // immediately if it has at least one item.
1108                    let ready = mem::take(&mut state.low_priority);
1109                    if ready.is_empty() {
1110                        return match next {
1111                            Poll::Ready(true) => {
1112                                // In this case, one of the futures in
1113                                // `ConcurrentState::futures` completed
1114                                // successfully, so we return now and continue
1115                                // the outer loop in case there is another one
1116                                // ready to complete.
1117                                Poll::Ready(Ok(Either::Right(Vec::new())))
1118                            }
1119                            Poll::Ready(false) => {
1120                                // Poll the future we were passed one last time
1121                                // in case one of `ConcurrentState::futures` had
1122                                // the side effect of unblocking it.
1123                                if let Poll::Ready(value) =
1124                                    tls::set(reset.store.0, || future.as_mut().poll(cx))
1125                                {
1126                                    Poll::Ready(Ok(Either::Left(value)))
1127                                } else {
1128                                    // In this case, there are no more pending
1129                                    // futures in `ConcurrentState::futures`,
1130                                    // there are no remaining work items, _and_
1131                                    // the future we were passed as an argument
1132                                    // still hasn't completed.
1133                                    if trap_on_idle {
1134                                        // `trap_on_idle` is true, so we exit
1135                                        // immediately.
1136                                        Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1137                                    } else {
1138                                        // `trap_on_idle` is false, so we assume
1139                                        // that future will wake up and give us
1140                                        // more work to do when it's ready to.
1141                                        Poll::Pending
1142                                    }
1143                                }
1144                            }
1145                            // There is at least one pending future in
1146                            // `ConcurrentState::futures` and we have nothing
1147                            // else to do but wait for now, so we return
1148                            // `Pending`.
1149                            Poll::Pending => Poll::Pending,
1150                        };
1151                    } else {
1152                        ready
1153                    }
1154                } else {
1155                    ready
1156                };
1157
1158                Poll::Ready(Ok(Either::Right(ready)))
1159            })
1160            .await;
1161
1162            // Put the `ConcurrentState::futures` back into the instance before
1163            // we return or handle any work items since one or more of those
1164            // items might append more futures.
1165            drop(reset);
1166
1167            match result? {
1168                // The future we were passed as an argument completed, so we
1169                // return the result.
1170                Either::Left(value) => break Ok(value),
1171                // The future we were passed has not yet completed, so handle
1172                // any work items and then loop again.
1173                Either::Right(ready) => {
1174                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1175                        store: StoreContextMut<'a, T>,
1176                        ready: I,
1177                    }
1178
1179                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1180                        fn drop(&mut self) {
1181                            while let Some(item) = self.ready.next() {
1182                                match item {
1183                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1184                                    WorkItem::PushFuture(future) => {
1185                                        tls::set(self.store.0, move || drop(future))
1186                                    }
1187                                    _ => {}
1188                                }
1189                            }
1190                        }
1191                    }
1192
1193                    let mut dispose = Dispose {
1194                        store: self.as_context_mut(),
1195                        ready: ready.into_iter(),
1196                    };
1197
1198                    while let Some(item) = dispose.ready.next() {
1199                        dispose
1200                            .store
1201                            .as_context_mut()
1202                            .handle_work_item(item)
1203                            .await?;
1204                    }
1205                }
1206            }
1207        }
1208    }
1209
1210    /// Handle the specified work item, possibly resuming a fiber if applicable.
1211    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1212    where
1213        T: Send,
1214    {
1215        log::trace!("handle work item {item:?}");
1216        match item {
1217            WorkItem::PushFuture(future) => {
1218                self.0
1219                    .concurrent_state_mut()
1220                    .futures
1221                    .get_mut()
1222                    .as_mut()
1223                    .unwrap()
1224                    .push(future.into_inner());
1225            }
1226            WorkItem::ResumeFiber(fiber) => {
1227                self.0.resume_fiber(fiber).await?;
1228            }
1229            WorkItem::GuestCall(call) => {
1230                let state = self.0.concurrent_state_mut();
1231                if call.is_ready(state)? {
1232                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1233                } else {
1234                    let task = state.get_mut(call.task)?;
1235                    if !task.starting_sent {
1236                        task.starting_sent = true;
1237                        if let GuestCallKind::Start(_) = &call.kind {
1238                            Waitable::Guest(call.task).set_event(
1239                                state,
1240                                Some(Event::Subtask {
1241                                    status: Status::Starting,
1242                                }),
1243                            )?;
1244                        }
1245                    }
1246
1247                    let runtime_instance = state.get_mut(call.task)?.instance;
1248                    state
1249                        .instance_state(runtime_instance)
1250                        .pending
1251                        .insert(call.task, call.kind);
1252                }
1253            }
1254            WorkItem::Poll(params) => {
1255                let state = self.0.concurrent_state_mut();
1256                if state.get_mut(params.task)?.event.is_some()
1257                    || !state.get_mut(params.set)?.ready.is_empty()
1258                {
1259                    // There's at least one event immediately available; deliver
1260                    // it to the guest ASAP.
1261                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1262                        task: params.task,
1263                        kind: GuestCallKind::DeliverEvent {
1264                            instance: params.instance,
1265                            set: Some(params.set),
1266                        },
1267                    }));
1268                } else {
1269                    // There are no events immediately available; deliver
1270                    // `Event::None` to the guest.
1271                    state.get_mut(params.task)?.event = Some(Event::None);
1272                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1273                        task: params.task,
1274                        kind: GuestCallKind::DeliverEvent {
1275                            instance: params.instance,
1276                            set: Some(params.set),
1277                        },
1278                    }));
1279                }
1280            }
1281            WorkItem::WorkerFunction(fun) => {
1282                self.run_on_worker(WorkerItem::Function(fun)).await?;
1283            }
1284        }
1285
1286        Ok(())
1287    }
1288
1289    /// Execute the specified guest call on a worker fiber.
1290    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1291    where
1292        T: Send,
1293    {
1294        let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1295            fiber
1296        } else {
1297            fiber::make_fiber(self.0, move |store| {
1298                loop {
1299                    match store.concurrent_state_mut().worker_item.take().unwrap() {
1300                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1301                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1302                    }
1303
1304                    store.suspend(SuspendReason::NeedWork)?;
1305                }
1306            })?
1307        };
1308
1309        let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1310        assert!(worker_item.is_none());
1311        *worker_item = Some(item);
1312
1313        self.0.resume_fiber(worker).await
1314    }
1315
1316    /// Wrap the specified host function in a future which will call it, passing
1317    /// it an `&Accessor<T>`.
1318    ///
1319    /// See the `Accessor` documentation for details.
1320    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1321    where
1322        T: 'static,
1323        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1324            + Send
1325            + Sync
1326            + 'static,
1327        R: Send + Sync + 'static,
1328    {
1329        let token = StoreToken::new(self);
1330        async move {
1331            let mut accessor = Accessor::new(token);
1332            closure(&mut accessor).await
1333        }
1334    }
1335}
1336
1337impl StoreOpaque {
1338    /// Resume the specified fiber, giving it exclusive access to the specified
1339    /// store.
1340    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1341        let old_task = self.concurrent_state_mut().guest_task;
1342        log::trace!("resume_fiber: save current task {old_task:?}");
1343
1344        let fiber = fiber::resolve_or_release(self, fiber).await?;
1345
1346        let state = self.concurrent_state_mut();
1347
1348        state.guest_task = old_task;
1349        log::trace!("resume_fiber: restore current task {old_task:?}");
1350
1351        if let Some(mut fiber) = fiber {
1352            // See the `SuspendReason` documentation for what each case means.
1353            match state.suspend_reason.take().unwrap() {
1354                SuspendReason::NeedWork => {
1355                    if state.worker.is_none() {
1356                        state.worker = Some(fiber);
1357                    } else {
1358                        fiber.dispose(self);
1359                    }
1360                }
1361                SuspendReason::Yielding { .. } => {
1362                    state.push_low_priority(WorkItem::ResumeFiber(fiber));
1363                }
1364                SuspendReason::Waiting { set, task } => {
1365                    let old = state
1366                        .get_mut(set)?
1367                        .waiting
1368                        .insert(task, WaitMode::Fiber(fiber));
1369                    assert!(old.is_none());
1370                }
1371            }
1372        }
1373
1374        Ok(())
1375    }
1376
1377    /// Suspend the current fiber, storing the reason in
1378    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1379    /// it should be resumed.
1380    ///
1381    /// See the `SuspendReason` documentation for details.
1382    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1383        log::trace!("suspend fiber: {reason:?}");
1384
1385        // If we're yielding or waiting on behalf of a guest task, we'll need to
1386        // pop the call context which manages resource borrows before suspending
1387        // and then push it again once we've resumed.
1388        let task = match &reason {
1389            SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
1390            SuspendReason::NeedWork => None,
1391        };
1392
1393        let old_guest_task = if let Some(task) = task {
1394            self.maybe_pop_call_context(task)?;
1395            self.concurrent_state_mut().guest_task
1396        } else {
1397            None
1398        };
1399
1400        let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1401        assert!(suspend_reason.is_none());
1402        *suspend_reason = Some(reason);
1403
1404        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1405
1406        if let Some(task) = task {
1407            self.concurrent_state_mut().guest_task = old_guest_task;
1408            self.maybe_push_call_context(task)?;
1409        }
1410
1411        Ok(())
1412    }
1413
1414    /// Push the call context for managing resource borrows for the specified
1415    /// guest task if it has not yet either returned a result or cancelled
1416    /// itself.
1417    fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1418        let task = self.concurrent_state_mut().get_mut(guest_task)?;
1419        if task.lift_result.is_some() {
1420            log::trace!("push call context for {guest_task:?}");
1421            let call_context = task.call_context.take().unwrap();
1422            self.component_resource_state().0.push(call_context);
1423        }
1424        Ok(())
1425    }
1426
1427    /// Pop the call context for managing resource borrows for the specified
1428    /// guest task if it has not yet either returned a result or cancelled
1429    /// itself.
1430    fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1431        if self
1432            .concurrent_state_mut()
1433            .get_mut(guest_task)?
1434            .lift_result
1435            .is_some()
1436        {
1437            log::trace!("pop call context for {guest_task:?}");
1438            let call_context = Some(self.component_resource_state().0.pop().unwrap());
1439            self.concurrent_state_mut()
1440                .get_mut(guest_task)?
1441                .call_context = call_context;
1442        }
1443        Ok(())
1444    }
1445
1446    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1447        let state = self.concurrent_state_mut();
1448        let caller = state.guest_task.unwrap();
1449        let old_set = waitable.common(state)?.set;
1450        let set = state.get_mut(caller)?.sync_call_set;
1451        waitable.join(state, Some(set))?;
1452        self.suspend(SuspendReason::Waiting { set, task: caller })?;
1453        let state = self.concurrent_state_mut();
1454        waitable.join(state, old_set)
1455    }
1456}
1457
1458impl Instance {
1459    /// Get the next pending event for the specified task and (optional)
1460    /// waitable set, along with the waitable handle if applicable.
1461    fn get_event(
1462        self,
1463        store: &mut StoreOpaque,
1464        guest_task: TableId<GuestTask>,
1465        set: Option<TableId<WaitableSet>>,
1466        cancellable: bool,
1467    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1468        let state = store.concurrent_state_mut();
1469
1470        if let Some(event) = state.get_mut(guest_task)?.event.take() {
1471            log::trace!("deliver event {event:?} to {guest_task:?}");
1472
1473            if cancellable || !matches!(event, Event::Cancelled) {
1474                return Ok(Some((event, None)));
1475            } else {
1476                state.get_mut(guest_task)?.event = Some(event);
1477            }
1478        }
1479
1480        Ok(
1481            if let Some((set, waitable)) = set
1482                .and_then(|set| {
1483                    state
1484                        .get_mut(set)
1485                        .map(|v| v.ready.pop_first().map(|v| (set, v)))
1486                        .transpose()
1487                })
1488                .transpose()?
1489            {
1490                let common = waitable.common(state)?;
1491                let handle = common.handle.unwrap();
1492                let event = common.event.take().unwrap();
1493
1494                log::trace!(
1495                    "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1496                );
1497
1498                waitable.on_delivery(self.id().get_mut(store), event);
1499
1500                Some((event, Some((waitable, handle))))
1501            } else {
1502                None
1503            },
1504        )
1505    }
1506
1507    /// Handle the `CallbackCode` returned from an async-lifted export or its
1508    /// callback.
1509    ///
1510    /// If `initial_call` is `true`, then the code was received from the
1511    /// async-lifted export; otherwise, it was received from its callback.
1512    fn handle_callback_code(
1513        self,
1514        store: &mut StoreOpaque,
1515        guest_task: TableId<GuestTask>,
1516        runtime_instance: RuntimeComponentInstanceIndex,
1517        code: u32,
1518        initial_call: bool,
1519    ) -> Result<()> {
1520        let (code, set) = unpack_callback_code(code);
1521
1522        log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
1523
1524        let state = store.concurrent_state_mut();
1525        let task = state.get_mut(guest_task)?;
1526
1527        if task.lift_result.is_some() {
1528            if code == callback_code::EXIT {
1529                return Err(anyhow!(crate::Trap::NoAsyncResult));
1530            }
1531            if initial_call {
1532                // Notify any current or future waiters that this subtask has
1533                // started.
1534                Waitable::Guest(guest_task).set_event(
1535                    state,
1536                    Some(Event::Subtask {
1537                        status: Status::Started,
1538                    }),
1539                )?;
1540            }
1541        }
1542
1543        let get_set = |store, handle| {
1544            if handle == 0 {
1545                bail!("invalid waitable-set handle");
1546            }
1547
1548            let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1549                .waitable_set_rep(handle)?;
1550
1551            Ok(TableId::<WaitableSet>::new(set))
1552        };
1553
1554        match code {
1555            callback_code::EXIT => {
1556                let task = state.get_mut(guest_task)?;
1557                match &task.caller {
1558                    Caller::Host {
1559                        remove_task_automatically,
1560                        ..
1561                    } => {
1562                        if *remove_task_automatically {
1563                            log::trace!("handle_callback_code will delete task {guest_task:?}");
1564                            Waitable::Guest(guest_task).delete_from(state)?;
1565                        }
1566                    }
1567                    Caller::Guest { .. } => {
1568                        task.exited = true;
1569                        task.callback = None;
1570                    }
1571                }
1572            }
1573            callback_code::YIELD => {
1574                // Push this task onto the "low priority" queue so it runs after
1575                // any other tasks have had a chance to run.
1576                let task = state.get_mut(guest_task)?;
1577                assert!(task.event.is_none());
1578                task.event = Some(Event::None);
1579                state.push_low_priority(WorkItem::GuestCall(GuestCall {
1580                    task: guest_task,
1581                    kind: GuestCallKind::DeliverEvent {
1582                        instance: self,
1583                        set: None,
1584                    },
1585                }));
1586            }
1587            callback_code::WAIT | callback_code::POLL => {
1588                let set = get_set(store, set)?;
1589                let state = store.concurrent_state_mut();
1590
1591                if state.get_mut(guest_task)?.event.is_some()
1592                    || !state.get_mut(set)?.ready.is_empty()
1593                {
1594                    // An event is immediately available; deliver it ASAP.
1595                    state.push_high_priority(WorkItem::GuestCall(GuestCall {
1596                        task: guest_task,
1597                        kind: GuestCallKind::DeliverEvent {
1598                            instance: self,
1599                            set: Some(set),
1600                        },
1601                    }));
1602                } else {
1603                    // No event is immediately available.
1604                    match code {
1605                        callback_code::POLL => {
1606                            // We're polling, so just yield and check whether an
1607                            // event has arrived after that.
1608                            state.push_low_priority(WorkItem::Poll(PollParams {
1609                                instance: self,
1610                                task: guest_task,
1611                                set,
1612                            }));
1613                        }
1614                        callback_code::WAIT => {
1615                            // We're waiting, so register to be woken up when an
1616                            // event is published for this waitable set.
1617                            //
1618                            // Here we also set `GuestTask::wake_on_cancel`
1619                            // which allows `subtask.cancel` to interrupt the
1620                            // wait.
1621                            let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
1622                            assert!(old.is_none());
1623                            let old = state
1624                                .get_mut(set)?
1625                                .waiting
1626                                .insert(guest_task, WaitMode::Callback(self));
1627                            assert!(old.is_none());
1628                        }
1629                        _ => unreachable!(),
1630                    }
1631                }
1632            }
1633            _ => bail!("unsupported callback code: {code}"),
1634        }
1635
1636        Ok(())
1637    }
1638
1639    /// Add the specified guest call to the "high priority" work item queue, to
1640    /// be started as soon as backpressure and/or reentrance rules allow.
1641    ///
1642    /// SAFETY: The raw pointer arguments must be valid references to guest
1643    /// functions (with the appropriate signatures) when the closures queued by
1644    /// this function are called.
1645    unsafe fn queue_call<T: 'static>(
1646        self,
1647        mut store: StoreContextMut<T>,
1648        guest_task: TableId<GuestTask>,
1649        callee: SendSyncPtr<VMFuncRef>,
1650        param_count: usize,
1651        result_count: usize,
1652        flags: Option<InstanceFlags>,
1653        async_: bool,
1654        callback: Option<SendSyncPtr<VMFuncRef>>,
1655        post_return: Option<SendSyncPtr<VMFuncRef>>,
1656    ) -> Result<()> {
1657        /// Return a closure which will call the specified function in the scope
1658        /// of the specified task.
1659        ///
1660        /// This will use `GuestTask::lower_params` to lower the parameters, but
1661        /// will not lift the result; instead, it returns a
1662        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1663        /// any, may be lifted.  Note that an async-lifted export will have
1664        /// returned its result using the `task.return` intrinsic (or not
1665        /// returned a result at all, in the case of `task.cancel`), in which
1666        /// case the "result" of this call will either be a callback code or
1667        /// nothing.
1668        ///
1669        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1670        /// the returned closure is called.
1671        unsafe fn make_call<T: 'static>(
1672            store: StoreContextMut<T>,
1673            guest_task: TableId<GuestTask>,
1674            callee: SendSyncPtr<VMFuncRef>,
1675            param_count: usize,
1676            result_count: usize,
1677            flags: Option<InstanceFlags>,
1678        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1679        + Send
1680        + Sync
1681        + 'static
1682        + use<T> {
1683            let token = StoreToken::new(store);
1684            move |store: &mut dyn VMStore| {
1685                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1686                let task = store.concurrent_state_mut().get_mut(guest_task)?;
1687                let may_enter_after_call = task.call_post_return_automatically();
1688                let lower = task.lower_params.take().unwrap();
1689
1690                lower(store, &mut storage[..param_count])?;
1691
1692                let mut store = token.as_context_mut(store);
1693
1694                // SAFETY: Per the contract documented in `make_call's`
1695                // documentation, `callee` must be a valid pointer.
1696                unsafe {
1697                    if let Some(mut flags) = flags {
1698                        flags.set_may_enter(false);
1699                    }
1700                    crate::Func::call_unchecked_raw(
1701                        &mut store,
1702                        callee.as_non_null(),
1703                        NonNull::new(
1704                            &mut storage[..param_count.max(result_count)]
1705                                as *mut [MaybeUninit<ValRaw>] as _,
1706                        )
1707                        .unwrap(),
1708                    )?;
1709                    if let Some(mut flags) = flags {
1710                        flags.set_may_enter(may_enter_after_call);
1711                    }
1712                }
1713
1714                Ok(storage)
1715            }
1716        }
1717
1718        // SAFETY: Per the contract described in this function documentation,
1719        // the `callee` pointer which `call` closes over must be valid when
1720        // called by the closure we queue below.
1721        let call = unsafe {
1722            make_call(
1723                store.as_context_mut(),
1724                guest_task,
1725                callee,
1726                param_count,
1727                result_count,
1728                flags,
1729            )
1730        };
1731
1732        let callee_instance = store.0.concurrent_state_mut().get_mut(guest_task)?.instance;
1733        let fun = if callback.is_some() {
1734            assert!(async_);
1735
1736            Box::new(move |store: &mut dyn VMStore| {
1737                let old_task = store.concurrent_state_mut().guest_task.replace(guest_task);
1738                log::trace!(
1739                    "stackless call: replaced {old_task:?} with {guest_task:?} as current task"
1740                );
1741
1742                store.maybe_push_call_context(guest_task)?;
1743
1744                store.concurrent_state_mut().enter_instance(callee_instance);
1745
1746                // SAFETY: See the documentation for `make_call` to review the
1747                // contract we must uphold for `call` here.
1748                //
1749                // Per the contract described in the `queue_call`
1750                // documentation, the `callee` pointer which `call` closes
1751                // over must be valid.
1752                let storage = call(store)?;
1753
1754                store
1755                    .concurrent_state_mut()
1756                    .exit_instance(callee_instance)?;
1757
1758                store.maybe_pop_call_context(guest_task)?;
1759
1760                let state = store.concurrent_state_mut();
1761                state.guest_task = old_task;
1762                log::trace!("stackless call: restored {old_task:?} as current task");
1763
1764                // SAFETY: `wasmparser` will have validated that the callback
1765                // function returns a `i32` result.
1766                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1767
1768                self.handle_callback_code(store, guest_task, callee_instance, code, true)?;
1769
1770                Ok(())
1771            }) as Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>
1772        } else {
1773            let token = StoreToken::new(store.as_context_mut());
1774            Box::new(move |store: &mut dyn VMStore| {
1775                let old_task = store.concurrent_state_mut().guest_task.replace(guest_task);
1776                log::trace!(
1777                    "stackful call: replaced {old_task:?} with {guest_task:?} as current task",
1778                );
1779
1780                let mut flags = self.id().get(store).instance_flags(callee_instance);
1781
1782                store.maybe_push_call_context(guest_task)?;
1783
1784                // Unless this is a callback-less (i.e. stackful)
1785                // async-lifted export, we need to record that the instance
1786                // cannot be entered until the call returns.
1787                if !async_ {
1788                    store.concurrent_state_mut().enter_instance(callee_instance);
1789                }
1790
1791                // SAFETY: See the documentation for `make_call` to review the
1792                // contract we must uphold for `call` here.
1793                //
1794                // Per the contract described in the `queue_call`
1795                // documentation, the `callee` pointer which `call` closes
1796                // over must be valid.
1797                let storage = call(store)?;
1798
1799                if async_ {
1800                    // This is a callback-less (i.e. stackful) async-lifted
1801                    // export, so there is no post-return function, and
1802                    // either `task.return` or `task.cancel` should have
1803                    // been called.
1804                    if store
1805                        .concurrent_state_mut()
1806                        .get_mut(guest_task)?
1807                        .lift_result
1808                        .is_some()
1809                    {
1810                        return Err(anyhow!(crate::Trap::NoAsyncResult));
1811                    }
1812                } else {
1813                    // This is a sync-lifted export, so now is when we lift the
1814                    // result, optionally call the post-return function, if any,
1815                    // and finally notify any current or future waiters that the
1816                    // subtask has returned.
1817
1818                    let lift = {
1819                        let state = store.concurrent_state_mut();
1820                        state.exit_instance(callee_instance)?;
1821
1822                        assert!(state.get_mut(guest_task)?.result.is_none());
1823
1824                        state.get_mut(guest_task)?.lift_result.take().unwrap()
1825                    };
1826
1827                    // SAFETY: `result_count` represents the number of core Wasm
1828                    // results returned, per `wasmparser`.
1829                    let result = (lift.lift)(store, unsafe {
1830                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1831                            &storage[..result_count],
1832                        )
1833                    })?;
1834
1835                    let post_return_arg = match result_count {
1836                        0 => ValRaw::i32(0),
1837                        // SAFETY: `result_count` represents the number of
1838                        // core Wasm results returned, per `wasmparser`.
1839                        1 => unsafe { storage[0].assume_init() },
1840                        _ => unreachable!(),
1841                    };
1842
1843                    if store
1844                        .concurrent_state_mut()
1845                        .get_mut(guest_task)?
1846                        .call_post_return_automatically()
1847                    {
1848                        unsafe {
1849                            flags.set_may_leave(false);
1850                            flags.set_needs_post_return(false);
1851                        }
1852
1853                        if let Some(func) = post_return {
1854                            let mut store = token.as_context_mut(store);
1855
1856                            // SAFETY: `func` is a valid `*mut VMFuncRef` from
1857                            // either `wasmtime-cranelift`-generated fused adapter
1858                            // code or `component::Options`.  Per `wasmparser`
1859                            // post-return signature validation, we know it takes a
1860                            // single parameter.
1861                            unsafe {
1862                                crate::Func::call_unchecked_raw(
1863                                    &mut store,
1864                                    func.as_non_null(),
1865                                    slice::from_ref(&post_return_arg).into(),
1866                                )?;
1867                            }
1868                        }
1869
1870                        unsafe {
1871                            flags.set_may_leave(true);
1872                            flags.set_may_enter(true);
1873                        }
1874                    }
1875
1876                    self.task_complete(
1877                        store,
1878                        guest_task,
1879                        result,
1880                        Status::Returned,
1881                        post_return_arg,
1882                    )?;
1883                }
1884
1885                store.maybe_pop_call_context(guest_task)?;
1886
1887                let task = store.concurrent_state_mut().get_mut(guest_task)?;
1888
1889                match &task.caller {
1890                    Caller::Host {
1891                        remove_task_automatically,
1892                        ..
1893                    } => {
1894                        if *remove_task_automatically {
1895                            Waitable::Guest(guest_task)
1896                                .delete_from(store.concurrent_state_mut())?;
1897                        }
1898                    }
1899                    Caller::Guest { .. } => {
1900                        task.exited = true;
1901                    }
1902                }
1903
1904                Ok(())
1905            })
1906        };
1907
1908        store
1909            .0
1910            .concurrent_state_mut()
1911            .push_high_priority(WorkItem::GuestCall(GuestCall {
1912                task: guest_task,
1913                kind: GuestCallKind::Start(fun),
1914            }));
1915
1916        Ok(())
1917    }
1918
1919    /// Prepare (but do not start) a guest->guest call.
1920    ///
1921    /// This is called from fused adapter code generated in
1922    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
1923    /// are synthesized Wasm functions which move the parameters from the caller
1924    /// to the callee and the result from the callee to the caller,
1925    /// respectively.  The adapter will call `Self::start_call` immediately
1926    /// after calling this function.
1927    ///
1928    /// SAFETY: All the pointer arguments must be valid pointers to guest
1929    /// entities (and with the expected signatures for the function references
1930    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
1931    unsafe fn prepare_call<T: 'static>(
1932        self,
1933        mut store: StoreContextMut<T>,
1934        start: *mut VMFuncRef,
1935        return_: *mut VMFuncRef,
1936        caller_instance: RuntimeComponentInstanceIndex,
1937        callee_instance: RuntimeComponentInstanceIndex,
1938        task_return_type: TypeTupleIndex,
1939        memory: *mut VMMemoryDefinition,
1940        string_encoding: u8,
1941        caller_info: CallerInfo,
1942    ) -> Result<()> {
1943        self.id().get(store.0).check_may_leave(caller_instance)?;
1944
1945        enum ResultInfo {
1946            Heap { results: u32 },
1947            Stack { result_count: u32 },
1948        }
1949
1950        let result_info = match &caller_info {
1951            CallerInfo::Async {
1952                has_result: true,
1953                params,
1954            } => ResultInfo::Heap {
1955                results: params.last().unwrap().get_u32(),
1956            },
1957            CallerInfo::Async {
1958                has_result: false, ..
1959            } => ResultInfo::Stack { result_count: 0 },
1960            CallerInfo::Sync {
1961                result_count,
1962                params,
1963            } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
1964                results: params.last().unwrap().get_u32(),
1965            },
1966            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
1967                result_count: *result_count,
1968            },
1969        };
1970
1971        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
1972
1973        // Create a new guest task for the call, closing over the `start` and
1974        // `return_` functions to lift the parameters and lower the result,
1975        // respectively.
1976        let start = SendSyncPtr::new(NonNull::new(start).unwrap());
1977        let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
1978        let token = StoreToken::new(store.as_context_mut());
1979        let state = store.0.concurrent_state_mut();
1980        let old_task = state.guest_task.take();
1981        let new_task = GuestTask::new(
1982            state,
1983            Box::new(move |store, dst| {
1984                let mut store = token.as_context_mut(store);
1985                assert!(dst.len() <= MAX_FLAT_PARAMS);
1986                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1987                let count = match caller_info {
1988                    // Async callers, if they have a result, use the last
1989                    // parameter as a return pointer so chop that off if
1990                    // relevant here.
1991                    CallerInfo::Async { params, has_result } => {
1992                        let params = &params[..params.len() - usize::from(has_result)];
1993                        for (param, src) in params.iter().zip(&mut src) {
1994                            src.write(*param);
1995                        }
1996                        params.len()
1997                    }
1998
1999                    // Sync callers forward everything directly.
2000                    CallerInfo::Sync { params, .. } => {
2001                        for (param, src) in params.iter().zip(&mut src) {
2002                            src.write(*param);
2003                        }
2004                        params.len()
2005                    }
2006                };
2007                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2008                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2009                // how it was constructed (see
2010                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2011                // for details) we know it takes count parameters and returns
2012                // `dst.len()` results.
2013                unsafe {
2014                    crate::Func::call_unchecked_raw(
2015                        &mut store,
2016                        start.as_non_null(),
2017                        NonNull::new(
2018                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2019                        )
2020                        .unwrap(),
2021                    )?;
2022                }
2023                dst.copy_from_slice(&src[..dst.len()]);
2024                let state = store.0.concurrent_state_mut();
2025                let task = state.guest_task.unwrap();
2026                Waitable::Guest(task).set_event(
2027                    state,
2028                    Some(Event::Subtask {
2029                        status: Status::Started,
2030                    }),
2031                )?;
2032                Ok(())
2033            }),
2034            LiftResult {
2035                lift: Box::new(move |store, src| {
2036                    // SAFETY: See comment in closure passed as `lower_params`
2037                    // parameter above.
2038                    let mut store = token.as_context_mut(store);
2039                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2040                    if let ResultInfo::Heap { results } = &result_info {
2041                        my_src.push(ValRaw::u32(*results));
2042                    }
2043                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2044                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2045                    // on how it was constructed (see
2046                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2047                    // for details) we know it takes `src.len()` parameters and
2048                    // returns up to 1 result.
2049                    unsafe {
2050                        crate::Func::call_unchecked_raw(
2051                            &mut store,
2052                            return_.as_non_null(),
2053                            my_src.as_mut_slice().into(),
2054                        )?;
2055                    }
2056                    let state = store.0.concurrent_state_mut();
2057                    let task = state.guest_task.unwrap();
2058                    if sync_caller {
2059                        state.get_mut(task)?.sync_result =
2060                            Some(if let ResultInfo::Stack { result_count } = &result_info {
2061                                match result_count {
2062                                    0 => None,
2063                                    1 => Some(my_src[0]),
2064                                    _ => unreachable!(),
2065                                }
2066                            } else {
2067                                None
2068                            });
2069                    }
2070                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2071                }),
2072                ty: task_return_type,
2073                memory: NonNull::new(memory).map(SendSyncPtr::new),
2074                string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2075            },
2076            Caller::Guest {
2077                task: old_task.unwrap(),
2078                instance: caller_instance,
2079            },
2080            None,
2081            callee_instance,
2082        )?;
2083
2084        let guest_task = state.push(new_task)?;
2085
2086        if let Some(old_task) = old_task {
2087            if !state.may_enter(guest_task) {
2088                bail!(crate::Trap::CannotEnterComponent);
2089            }
2090
2091            state.get_mut(old_task)?.subtasks.insert(guest_task);
2092        };
2093
2094        // Make the new task the current one so that `Self::start_call` knows
2095        // which one to start.
2096        state.guest_task = Some(guest_task);
2097        log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
2098
2099        Ok(())
2100    }
2101
2102    /// Call the specified callback function for an async-lifted export.
2103    ///
2104    /// SAFETY: `function` must be a valid reference to a guest function of the
2105    /// correct signature for a callback.
2106    unsafe fn call_callback<T>(
2107        self,
2108        mut store: StoreContextMut<T>,
2109        callee_instance: RuntimeComponentInstanceIndex,
2110        function: SendSyncPtr<VMFuncRef>,
2111        event: Event,
2112        handle: u32,
2113        may_enter_after_call: bool,
2114    ) -> Result<u32> {
2115        let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2116
2117        let (ordinal, result) = event.parts();
2118        let params = &mut [
2119            ValRaw::u32(ordinal),
2120            ValRaw::u32(handle),
2121            ValRaw::u32(result),
2122        ];
2123        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2124        // `wasmtime-cranelift`-generated fused adapter code or
2125        // `component::Options`.  Per `wasmparser` callback signature
2126        // validation, we know it takes three parameters and returns one.
2127        unsafe {
2128            flags.set_may_enter(false);
2129            crate::Func::call_unchecked_raw(
2130                &mut store,
2131                function.as_non_null(),
2132                params.as_mut_slice().into(),
2133            )?;
2134            flags.set_may_enter(may_enter_after_call);
2135        }
2136        Ok(params[0].get_u32())
2137    }
2138
2139    /// Start a guest->guest call previously prepared using
2140    /// `Self::prepare_call`.
2141    ///
2142    /// This is called from fused adapter code generated in
2143    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2144    /// this function immediately after calling `Self::prepare_call`.
2145    ///
2146    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2147    /// functions with the appropriate signatures for the current guest task.
2148    /// If this is a call to an async-lowered import, the actual call may be
2149    /// deferred and run after this function returns, in which case the pointer
2150    /// arguments must also be valid when the call happens.
2151    unsafe fn start_call<T: 'static>(
2152        self,
2153        mut store: StoreContextMut<T>,
2154        callback: *mut VMFuncRef,
2155        post_return: *mut VMFuncRef,
2156        callee: *mut VMFuncRef,
2157        param_count: u32,
2158        result_count: u32,
2159        flags: u32,
2160        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2161    ) -> Result<u32> {
2162        let token = StoreToken::new(store.as_context_mut());
2163        let async_caller = storage.is_none();
2164        let state = store.0.concurrent_state_mut();
2165        let guest_task = state.guest_task.unwrap();
2166        let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically();
2167        let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2168        let param_count = usize::try_from(param_count).unwrap();
2169        assert!(param_count <= MAX_FLAT_PARAMS);
2170        let result_count = usize::try_from(result_count).unwrap();
2171        assert!(result_count <= MAX_FLAT_RESULTS);
2172
2173        let task = state.get_mut(guest_task)?;
2174        if !callback.is_null() {
2175            // We're calling an async-lifted export with a callback, so store
2176            // the callback and related context as part of the task so we can
2177            // call it later when needed.
2178            let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2179            task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2180                let store = token.as_context_mut(store);
2181                unsafe {
2182                    self.call_callback::<T>(
2183                        store,
2184                        runtime_instance,
2185                        callback,
2186                        event,
2187                        handle,
2188                        may_enter_after_call,
2189                    )
2190                }
2191            }));
2192        }
2193
2194        let Caller::Guest {
2195            task: caller,
2196            instance: runtime_instance,
2197        } = &task.caller
2198        else {
2199            // As of this writing, `start_call` is only used for guest->guest
2200            // calls.
2201            unreachable!()
2202        };
2203        let caller = *caller;
2204        let caller_instance = *runtime_instance;
2205
2206        let callee_instance = task.instance;
2207
2208        let instance_flags = if callback.is_null() {
2209            None
2210        } else {
2211            Some(self.id().get(store.0).instance_flags(callee_instance))
2212        };
2213
2214        // Queue the call as a "high priority" work item.
2215        unsafe {
2216            self.queue_call(
2217                store.as_context_mut(),
2218                guest_task,
2219                callee,
2220                param_count,
2221                result_count,
2222                instance_flags,
2223                (flags & START_FLAG_ASYNC_CALLEE) != 0,
2224                NonNull::new(callback).map(SendSyncPtr::new),
2225                NonNull::new(post_return).map(SendSyncPtr::new),
2226            )?;
2227        }
2228
2229        let state = store.0.concurrent_state_mut();
2230
2231        // Use the caller's `GuestTask::sync_call_set` to register interest in
2232        // the subtask...
2233        let guest_waitable = Waitable::Guest(guest_task);
2234        let old_set = guest_waitable.common(state)?.set;
2235        let set = state.get_mut(caller)?.sync_call_set;
2236        guest_waitable.join(state, Some(set))?;
2237
2238        // ... and suspend this fiber temporarily while we wait for it to start.
2239        //
2240        // Note that we _could_ call the callee directly using the current fiber
2241        // rather than suspend this one, but that would make reasoning about the
2242        // event loop more complicated and is probably only worth doing if
2243        // there's a measurable performance benefit.  In addition, it would mean
2244        // blocking the caller if the callee calls a blocking sync-lowered
2245        // import, and as of this writing the spec says we must not do that.
2246        //
2247        // Alternatively, the fused adapter code could be modified to call the
2248        // callee directly without calling a host-provided intrinsic at all (in
2249        // which case it would need to do its own, inline backpressure checks,
2250        // etc.).  Again, we'd want to see a measurable performance benefit
2251        // before committing to such an optimization.  And again, we'd need to
2252        // update the spec to allow that.
2253        let (status, waitable) = loop {
2254            store
2255                .0
2256                .suspend(SuspendReason::Waiting { set, task: caller })?;
2257
2258            let state = store.0.concurrent_state_mut();
2259
2260            let event = guest_waitable.take_event(state)?;
2261            let Some(Event::Subtask { status }) = event else {
2262                unreachable!();
2263            };
2264
2265            log::trace!("status {status:?} for {guest_task:?}");
2266
2267            if status == Status::Returned {
2268                // It returned, so we can stop waiting.
2269                break (status, None);
2270            } else if async_caller {
2271                // It hasn't returned yet, but the caller is calling via an
2272                // async-lowered import, so we generate a handle for the task
2273                // waitable and return the status.
2274                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2275                    .subtask_insert_guest(guest_task.rep())?;
2276                store
2277                    .0
2278                    .concurrent_state_mut()
2279                    .get_mut(guest_task)?
2280                    .common
2281                    .handle = Some(handle);
2282                break (status, Some(handle));
2283            } else {
2284                // The callee hasn't returned yet, and the caller is calling via
2285                // a sync-lowered import, so we loop and keep waiting until the
2286                // callee returns.
2287            }
2288        };
2289
2290        let state = store.0.concurrent_state_mut();
2291
2292        guest_waitable.join(state, old_set)?;
2293
2294        if let Some(storage) = storage {
2295            // The caller used a sync-lowered import to call an async-lifted
2296            // export, in which case the result, if any, has been stashed in
2297            // `GuestTask::sync_result`.
2298            let task = state.get_mut(guest_task)?;
2299            if let Some(result) = task.sync_result.take() {
2300                if let Some(result) = result {
2301                    storage[0] = MaybeUninit::new(result);
2302                }
2303
2304                if task.exited {
2305                    Waitable::Guest(guest_task).delete_from(state)?;
2306                }
2307            } else {
2308                // This means the callee failed to call either `task.return` or
2309                // `task.cancel` before exiting.
2310                return Err(anyhow!(crate::Trap::NoAsyncResult));
2311            }
2312        }
2313
2314        // Reset the current task to point to the caller as it resumes control.
2315        state.guest_task = Some(caller);
2316        log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
2317
2318        Ok(status.pack(waitable))
2319    }
2320
2321    /// Poll the specified future once on behalf of a guest->host call using an
2322    /// async-lowered import.
2323    ///
2324    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2325    /// `Pending`, add it to the set of futures to be polled as part of this
2326    /// instance's event loop until it completes, and then return
2327    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2328    ///
2329    /// Whether the future returns `Ready` immediately or later, the `lower`
2330    /// function will be used to lower the result, if any, into the guest caller's
2331    /// stack and linear memory unless the task has been cancelled.
2332    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2333        self,
2334        mut store: StoreContextMut<T>,
2335        future: impl Future<Output = Result<R>> + Send + 'static,
2336        caller_instance: RuntimeComponentInstanceIndex,
2337        lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2338    ) -> Result<Option<u32>> {
2339        let token = StoreToken::new(store.as_context_mut());
2340        let state = store.0.concurrent_state_mut();
2341        let caller = state.guest_task.unwrap();
2342
2343        // Create an abortable future which hooks calls to poll and manages call
2344        // context state for the future.
2345        let (join_handle, future) = JoinHandle::run(async move {
2346            let mut future = pin!(future);
2347            let mut call_context = None;
2348            future::poll_fn(move |cx| {
2349                // Push the call context for managing any resource borrows
2350                // for the task.
2351                tls::get(|store| {
2352                    if let Some(call_context) = call_context.take() {
2353                        token
2354                            .as_context_mut(store)
2355                            .0
2356                            .component_resource_state()
2357                            .0
2358                            .push(call_context);
2359                    }
2360                });
2361
2362                let result = future.as_mut().poll(cx);
2363
2364                if result.is_pending() {
2365                    // Pop the call context for managing any resource
2366                    // borrows for the task.
2367                    tls::get(|store| {
2368                        call_context = Some(
2369                            token
2370                                .as_context_mut(store)
2371                                .0
2372                                .component_resource_state()
2373                                .0
2374                                .pop()
2375                                .unwrap(),
2376                        );
2377                    });
2378                }
2379                result
2380            })
2381            .await
2382        });
2383
2384        // We create a new host task even though it might complete immediately
2385        // (in which case we won't need to pass a waitable back to the guest).
2386        // If it does complete immediately, we'll remove it before we return.
2387        let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2388
2389        log::trace!("new host task child of {caller:?}: {task:?}");
2390
2391        let mut future = Box::pin(future);
2392
2393        // Finally, poll the future.  We can use a dummy `Waker` here because
2394        // we'll add the future to `ConcurrentState::futures` and poll it
2395        // automatically from the event loop if it doesn't complete immediately
2396        // here.
2397        let poll = tls::set(store.0, || {
2398            future
2399                .as_mut()
2400                .poll(&mut Context::from_waker(&Waker::noop()))
2401        });
2402
2403        Ok(match poll {
2404            Poll::Ready(None) => unreachable!(),
2405            Poll::Ready(Some(result)) => {
2406                // It finished immediately; lower the result and delete the
2407                // task.
2408                lower(store.as_context_mut(), result?)?;
2409                log::trace!("delete host task {task:?} (already ready)");
2410                store.0.concurrent_state_mut().delete(task)?;
2411                None
2412            }
2413            Poll::Pending => {
2414                // It hasn't finished yet; add the future to
2415                // `ConcurrentState::futures` so it will be polled by the event
2416                // loop and allocate a waitable handle to return to the guest.
2417
2418                // Wrap the future in a closure responsible for lowering the result into
2419                // the guest's stack and memory, as well as notifying any waiters that
2420                // the task returned.
2421                let future =
2422                    Box::pin(async move {
2423                        let result = match future.await {
2424                            Some(result) => result?,
2425                            // Task was cancelled; nothing left to do.
2426                            None => return Ok(()),
2427                        };
2428                        tls::get(move |store| {
2429                            // Here we schedule a task to run on a worker fiber to do
2430                            // the lowering since it may involve a call to the guest's
2431                            // realloc function.  This is necessary because calling the
2432                            // guest while there are host embedder frames on the stack
2433                            // is unsound.
2434                            store.concurrent_state_mut().push_high_priority(
2435                                WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2436                                    lower(token.as_context_mut(store), result)?;
2437                                    let state = store.concurrent_state_mut();
2438                                    state.get_mut(task)?.join_handle.take();
2439                                    Waitable::Host(task).set_event(
2440                                        state,
2441                                        Some(Event::Subtask {
2442                                            status: Status::Returned,
2443                                        }),
2444                                    )
2445                                }))),
2446                            );
2447                            Ok(())
2448                        })
2449                    });
2450
2451                store.0.concurrent_state_mut().push_future(future);
2452                let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2453                    .subtask_insert_host(task.rep())?;
2454                store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2455                log::trace!(
2456                    "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2457                );
2458                Some(handle)
2459            }
2460        })
2461    }
2462
2463    /// Implements the `task.return` intrinsic, lifting the result for the
2464    /// current guest task.
2465    pub(crate) fn task_return(
2466        self,
2467        store: &mut dyn VMStore,
2468        caller: RuntimeComponentInstanceIndex,
2469        ty: TypeTupleIndex,
2470        options: OptionsIndex,
2471        storage: &[ValRaw],
2472    ) -> Result<()> {
2473        self.id().get(store).check_may_leave(caller)?;
2474        let state = store.concurrent_state_mut();
2475        let guest_task = state.guest_task.unwrap();
2476        let lift = state
2477            .get_mut(guest_task)?
2478            .lift_result
2479            .take()
2480            .ok_or_else(|| {
2481                anyhow!("`task.return` or `task.cancel` called more than once for current task")
2482            })?;
2483        assert!(state.get_mut(guest_task)?.result.is_none());
2484
2485        let CanonicalOptions {
2486            string_encoding,
2487            data_model,
2488            ..
2489        } = &self.id().get(store).component().env_component().options[options];
2490
2491        let invalid = ty != lift.ty
2492            || string_encoding != &lift.string_encoding
2493            || match data_model {
2494                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2495                    Some(memory) => {
2496                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2497                        let actual = self.id().get(store).runtime_memory(memory);
2498                        expected != actual
2499                    }
2500                    // Memory not specified, meaning it didn't need to be
2501                    // specified per validation, so not invalid.
2502                    None => false,
2503                },
2504                // Always invalid as this isn't supported.
2505                CanonicalOptionsDataModel::Gc { .. } => true,
2506            };
2507
2508        if invalid {
2509            bail!("invalid `task.return` signature and/or options for current task");
2510        }
2511
2512        log::trace!("task.return for {guest_task:?}");
2513
2514        let result = (lift.lift)(store, storage)?;
2515
2516        self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
2517    }
2518
2519    /// Implements the `task.cancel` intrinsic.
2520    pub(crate) fn task_cancel(
2521        self,
2522        store: &mut StoreOpaque,
2523        caller: RuntimeComponentInstanceIndex,
2524    ) -> Result<()> {
2525        self.id().get(store).check_may_leave(caller)?;
2526        let state = store.concurrent_state_mut();
2527        let guest_task = state.guest_task.unwrap();
2528        let task = state.get_mut(guest_task)?;
2529        if !task.cancel_sent {
2530            bail!("`task.cancel` called by task which has not been cancelled")
2531        }
2532        _ = task.lift_result.take().ok_or_else(|| {
2533            anyhow!("`task.return` or `task.cancel` called more than once for current task")
2534        })?;
2535
2536        assert!(task.result.is_none());
2537
2538        log::trace!("task.cancel for {guest_task:?}");
2539
2540        self.task_complete(
2541            store,
2542            guest_task,
2543            Box::new(DummyResult),
2544            Status::ReturnCancelled,
2545            ValRaw::i32(0),
2546        )
2547    }
2548
2549    /// Complete the specified guest task (i.e. indicate that it has either
2550    /// returned a (possibly empty) result or cancelled itself).
2551    ///
2552    /// This will return any resource borrows and notify any current or future
2553    /// waiters that the task has completed.
2554    fn task_complete(
2555        self,
2556        store: &mut StoreOpaque,
2557        guest_task: TableId<GuestTask>,
2558        result: Box<dyn Any + Send + Sync>,
2559        status: Status,
2560        post_return_arg: ValRaw,
2561    ) -> Result<()> {
2562        if store
2563            .concurrent_state_mut()
2564            .get_mut(guest_task)?
2565            .call_post_return_automatically()
2566        {
2567            let (calls, host_table, _, instance) =
2568                store.component_resource_state_with_instance(self);
2569            ResourceTables {
2570                calls,
2571                host_table: Some(host_table),
2572                guest: Some(instance.guest_tables()),
2573            }
2574            .exit_call()?;
2575        } else {
2576            // As of this writing, the only scenario where `call_post_return_automatically`
2577            // would be false for a `GuestTask` is for host-to-guest calls using
2578            // `[Typed]Func::call_async`, in which case the `function_index`
2579            // should be a non-`None` value.
2580            let function_index = store
2581                .concurrent_state_mut()
2582                .get_mut(guest_task)?
2583                .function_index
2584                .unwrap();
2585
2586            self.id()
2587                .get_mut(store)
2588                .post_return_arg_set(function_index, post_return_arg);
2589        }
2590
2591        let state = store.concurrent_state_mut();
2592        let task = state.get_mut(guest_task)?;
2593
2594        if let Caller::Host { tx, .. } = &mut task.caller {
2595            if let Some(tx) = tx.take() {
2596                _ = tx.send(result);
2597            }
2598        } else {
2599            task.result = Some(result);
2600            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2601        }
2602
2603        Ok(())
2604    }
2605
2606    /// Implements the `waitable-set.new` intrinsic.
2607    pub(crate) fn waitable_set_new(
2608        self,
2609        store: &mut StoreOpaque,
2610        caller_instance: RuntimeComponentInstanceIndex,
2611    ) -> Result<u32> {
2612        self.id().get_mut(store).check_may_leave(caller_instance)?;
2613        let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2614        let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2615            .waitable_set_insert(set.rep())?;
2616        log::trace!("new waitable set {set:?} (handle {handle})");
2617        Ok(handle)
2618    }
2619
2620    /// Implements the `waitable-set.drop` intrinsic.
2621    pub(crate) fn waitable_set_drop(
2622        self,
2623        store: &mut StoreOpaque,
2624        caller_instance: RuntimeComponentInstanceIndex,
2625        set: u32,
2626    ) -> Result<()> {
2627        self.id().get_mut(store).check_may_leave(caller_instance)?;
2628        let rep =
2629            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2630
2631        log::trace!("drop waitable set {rep} (handle {set})");
2632
2633        let set = store
2634            .concurrent_state_mut()
2635            .delete(TableId::<WaitableSet>::new(rep))?;
2636
2637        if !set.waiting.is_empty() {
2638            bail!("cannot drop waitable set with waiters");
2639        }
2640
2641        Ok(())
2642    }
2643
2644    /// Implements the `waitable.join` intrinsic.
2645    pub(crate) fn waitable_join(
2646        self,
2647        store: &mut StoreOpaque,
2648        caller_instance: RuntimeComponentInstanceIndex,
2649        waitable_handle: u32,
2650        set_handle: u32,
2651    ) -> Result<()> {
2652        let mut instance = self.id().get_mut(store);
2653        instance.check_may_leave(caller_instance)?;
2654        let waitable =
2655            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2656
2657        let set = if set_handle == 0 {
2658            None
2659        } else {
2660            let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2661
2662            Some(TableId::<WaitableSet>::new(set))
2663        };
2664
2665        log::trace!(
2666            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2667        );
2668
2669        waitable.join(store.concurrent_state_mut(), set)
2670    }
2671
2672    /// Implements the `subtask.drop` intrinsic.
2673    pub(crate) fn subtask_drop(
2674        self,
2675        store: &mut StoreOpaque,
2676        caller_instance: RuntimeComponentInstanceIndex,
2677        task_id: u32,
2678    ) -> Result<()> {
2679        self.id().get_mut(store).check_may_leave(caller_instance)?;
2680        self.waitable_join(store, caller_instance, task_id, 0)?;
2681
2682        let (rep, is_host) =
2683            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2684
2685        let concurrent_state = store.concurrent_state_mut();
2686        let (waitable, expected_caller_instance, delete) = if is_host {
2687            let id = TableId::<HostTask>::new(rep);
2688            let task = concurrent_state.get_mut(id)?;
2689            if task.join_handle.is_some() {
2690                bail!("cannot drop a subtask which has not yet resolved");
2691            }
2692            (Waitable::Host(id), task.caller_instance, true)
2693        } else {
2694            let id = TableId::<GuestTask>::new(rep);
2695            let task = concurrent_state.get_mut(id)?;
2696            if task.lift_result.is_some() {
2697                bail!("cannot drop a subtask which has not yet resolved");
2698            }
2699            if let Caller::Guest { instance, .. } = &task.caller {
2700                (Waitable::Guest(id), *instance, task.exited)
2701            } else {
2702                unreachable!()
2703            }
2704        };
2705
2706        waitable.common(concurrent_state)?.handle = None;
2707
2708        if waitable.take_event(concurrent_state)?.is_some() {
2709            bail!("cannot drop a subtask with an undelivered event");
2710        }
2711
2712        if delete {
2713            waitable.delete_from(concurrent_state)?;
2714        }
2715
2716        // Since waitables can neither be passed between instances nor forged,
2717        // this should never fail unless there's a bug in Wasmtime, but we check
2718        // here to be sure:
2719        assert_eq!(expected_caller_instance, caller_instance);
2720        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2721        Ok(())
2722    }
2723
2724    /// Implements the `waitable-set.wait` intrinsic.
2725    pub(crate) fn waitable_set_wait(
2726        self,
2727        store: &mut StoreOpaque,
2728        caller: RuntimeComponentInstanceIndex,
2729        options: OptionsIndex,
2730        set: u32,
2731        payload: u32,
2732    ) -> Result<u32> {
2733        self.id().get(store).check_may_leave(caller)?;
2734        let &CanonicalOptions {
2735            cancellable,
2736            instance: caller_instance,
2737            ..
2738        } = &self.id().get(store).component().env_component().options[options];
2739        let rep =
2740            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2741
2742        self.waitable_check(
2743            store,
2744            cancellable,
2745            WaitableCheck::Wait(WaitableCheckParams {
2746                set: TableId::new(rep),
2747                options,
2748                payload,
2749            }),
2750        )
2751    }
2752
2753    /// Implements the `waitable-set.poll` intrinsic.
2754    pub(crate) fn waitable_set_poll(
2755        self,
2756        store: &mut StoreOpaque,
2757        caller: RuntimeComponentInstanceIndex,
2758        options: OptionsIndex,
2759        set: u32,
2760        payload: u32,
2761    ) -> Result<u32> {
2762        self.id().get(store).check_may_leave(caller)?;
2763        let &CanonicalOptions {
2764            cancellable,
2765            instance: caller_instance,
2766            ..
2767        } = &self.id().get(store).component().env_component().options[options];
2768        let rep =
2769            self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2770
2771        self.waitable_check(
2772            store,
2773            cancellable,
2774            WaitableCheck::Poll(WaitableCheckParams {
2775                set: TableId::new(rep),
2776                options,
2777                payload,
2778            }),
2779        )
2780    }
2781
2782    /// Implements the `thread.yield` intrinsic.
2783    pub(crate) fn thread_yield(
2784        self,
2785        store: &mut StoreOpaque,
2786        caller: RuntimeComponentInstanceIndex,
2787        cancellable: bool,
2788    ) -> Result<bool> {
2789        self.id().get(store).check_may_leave(caller)?;
2790        self.waitable_check(store, cancellable, WaitableCheck::Yield)
2791            .map(|_| {
2792                if cancellable {
2793                    let state = store.concurrent_state_mut();
2794                    let task = state.guest_task.unwrap();
2795                    if let Some(event) = state.get_mut(task).unwrap().event.take() {
2796                        assert!(matches!(event, Event::Cancelled));
2797                        true
2798                    } else {
2799                        false
2800                    }
2801                } else {
2802                    false
2803                }
2804            })
2805    }
2806
2807    /// Helper function for the `waitable-set.wait`, `waitable-set.poll`, and
2808    /// `yield` intrinsics.
2809    fn waitable_check(
2810        self,
2811        store: &mut StoreOpaque,
2812        cancellable: bool,
2813        check: WaitableCheck,
2814    ) -> Result<u32> {
2815        let guest_task = store.concurrent_state_mut().guest_task.unwrap();
2816
2817        let (wait, set) = match &check {
2818            WaitableCheck::Wait(params) => (true, Some(params.set)),
2819            WaitableCheck::Poll(params) => (false, Some(params.set)),
2820            WaitableCheck::Yield => (false, None),
2821        };
2822
2823        // First, suspend this fiber, allowing any other tasks to run.
2824        store.suspend(SuspendReason::Yielding { task: guest_task })?;
2825
2826        log::trace!("waitable check for {guest_task:?}; set {set:?}");
2827
2828        let state = store.concurrent_state_mut();
2829        let task = state.get_mut(guest_task)?;
2830
2831        // If we're waiting, and there are no events immediately available,
2832        // suspend the fiber until that changes.
2833        if wait {
2834            let set = set.unwrap();
2835
2836            if (task.event.is_none()
2837                || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
2838                && state.get_mut(set)?.ready.is_empty()
2839            {
2840                if cancellable {
2841                    let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
2842                    assert!(old.is_none());
2843                }
2844
2845                store.suspend(SuspendReason::Waiting {
2846                    set,
2847                    task: guest_task,
2848                })?;
2849            }
2850        }
2851
2852        log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
2853
2854        let result = match check {
2855            // Deliver any pending events to the guest and return.
2856            WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
2857                let event = self.get_event(store, guest_task, Some(params.set), cancellable)?;
2858
2859                let (ordinal, handle, result) = if wait {
2860                    let (event, waitable) = event.unwrap();
2861                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2862                    let (ordinal, result) = event.parts();
2863                    (ordinal, handle, result)
2864                } else {
2865                    if let Some((event, waitable)) = event {
2866                        let handle = waitable.map(|(_, v)| v).unwrap_or(0);
2867                        let (ordinal, result) = event.parts();
2868                        (ordinal, handle, result)
2869                    } else {
2870                        log::trace!(
2871                            "no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
2872                            params.set
2873                        );
2874                        let (ordinal, result) = Event::None.parts();
2875                        (ordinal, 0, result)
2876                    }
2877                };
2878                let options = Options::new_index(store, self, params.options);
2879                let ptr = func::validate_inbounds::<(u32, u32)>(
2880                    options.memory_mut(store),
2881                    &ValRaw::u32(params.payload),
2882                )?;
2883                options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
2884                options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
2885                Ok(ordinal)
2886            }
2887            WaitableCheck::Yield => Ok(0),
2888        };
2889
2890        result
2891    }
2892
2893    /// Implements the `subtask.cancel` intrinsic.
2894    pub(crate) fn subtask_cancel(
2895        self,
2896        store: &mut StoreOpaque,
2897        caller_instance: RuntimeComponentInstanceIndex,
2898        async_: bool,
2899        task_id: u32,
2900    ) -> Result<u32> {
2901        self.id().get(store).check_may_leave(caller_instance)?;
2902        let (rep, is_host) =
2903            self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
2904        let (waitable, expected_caller_instance) = if is_host {
2905            let id = TableId::<HostTask>::new(rep);
2906            (
2907                Waitable::Host(id),
2908                store.concurrent_state_mut().get_mut(id)?.caller_instance,
2909            )
2910        } else {
2911            let id = TableId::<GuestTask>::new(rep);
2912            if let Caller::Guest { instance, .. } =
2913                &store.concurrent_state_mut().get_mut(id)?.caller
2914            {
2915                (Waitable::Guest(id), *instance)
2916            } else {
2917                unreachable!()
2918            }
2919        };
2920        // Since waitables can neither be passed between instances nor forged,
2921        // this should never fail unless there's a bug in Wasmtime, but we check
2922        // here to be sure:
2923        assert_eq!(expected_caller_instance, caller_instance);
2924
2925        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
2926
2927        let concurrent_state = store.concurrent_state_mut();
2928        if let Waitable::Host(host_task) = waitable {
2929            if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
2930                handle.abort();
2931                return Ok(Status::ReturnCancelled as u32);
2932            }
2933        } else {
2934            let caller = concurrent_state.guest_task.unwrap();
2935            let guest_task = TableId::<GuestTask>::new(rep);
2936            let task = concurrent_state.get_mut(guest_task)?;
2937            if task.lower_params.is_some() {
2938                task.lower_params = None;
2939                task.lift_result = None;
2940
2941                // Not yet started; cancel and remove from pending
2942                let callee_instance = task.instance;
2943
2944                let kind = concurrent_state
2945                    .instance_state(callee_instance)
2946                    .pending
2947                    .remove(&guest_task);
2948
2949                if kind.is_none() {
2950                    bail!("`subtask.cancel` called after terminal status delivered");
2951                }
2952
2953                return Ok(Status::StartCancelled as u32);
2954            } else if task.lift_result.is_some() {
2955                // Started, but not yet returned or cancelled; send the
2956                // `CANCELLED` event
2957                task.cancel_sent = true;
2958                // Note that this might overwrite an event that was set earlier
2959                // (e.g. `Event::None` if the task is yielding, or
2960                // `Event::Cancelled` if it was already cancelled), but that's
2961                // okay -- this should supersede the previous state.
2962                task.event = Some(Event::Cancelled);
2963                if let Some(set) = task.wake_on_cancel.take() {
2964                    let item = match concurrent_state
2965                        .get_mut(set)?
2966                        .waiting
2967                        .remove(&guest_task)
2968                        .unwrap()
2969                    {
2970                        WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
2971                        WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
2972                            task: guest_task,
2973                            kind: GuestCallKind::DeliverEvent {
2974                                instance,
2975                                set: None,
2976                            },
2977                        }),
2978                    };
2979                    concurrent_state.push_high_priority(item);
2980
2981                    store.suspend(SuspendReason::Yielding { task: caller })?;
2982                }
2983
2984                let concurrent_state = store.concurrent_state_mut();
2985                let task = concurrent_state.get_mut(guest_task)?;
2986                if task.lift_result.is_some() {
2987                    // Still not yet returned or cancelled; if `async_`, return
2988                    // `BLOCKED`; otherwise wait
2989                    if async_ {
2990                        return Ok(BLOCKED);
2991                    } else {
2992                        store.wait_for_event(Waitable::Guest(guest_task))?;
2993                    }
2994                }
2995            }
2996        }
2997
2998        let event = waitable.take_event(store.concurrent_state_mut())?;
2999        if let Some(Event::Subtask {
3000            status: status @ (Status::Returned | Status::ReturnCancelled),
3001        }) = event
3002        {
3003            Ok(status as u32)
3004        } else {
3005            bail!("`subtask.cancel` called after terminal status delivered");
3006        }
3007    }
3008
3009    pub(crate) fn context_get(
3010        self,
3011        store: &mut StoreOpaque,
3012        caller: RuntimeComponentInstanceIndex,
3013        slot: u32,
3014    ) -> Result<u32> {
3015        self.id().get(store).check_may_leave(caller)?;
3016        store.concurrent_state_mut().context_get(slot)
3017    }
3018
3019    pub(crate) fn context_set(
3020        self,
3021        store: &mut StoreOpaque,
3022        caller: RuntimeComponentInstanceIndex,
3023        slot: u32,
3024        value: u32,
3025    ) -> Result<()> {
3026        self.id().get(store).check_may_leave(caller)?;
3027        store.concurrent_state_mut().context_set(slot, value)
3028    }
3029}
3030
3031/// Trait representing component model ABI async intrinsics and fused adapter
3032/// helper functions.
3033///
3034/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3035/// which must be valid for at least the duration of the call (and possibly for
3036/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3037/// pointers used for async calls).
3038pub trait VMComponentAsyncStore {
3039    /// A helper function for fused adapter modules involving calls where the
3040    /// one of the caller or callee is async.
3041    ///
3042    /// This helper is not used when the caller and callee both use the sync
3043    /// ABI, only when at least one is async is this used.
3044    unsafe fn prepare_call(
3045        &mut self,
3046        instance: Instance,
3047        memory: *mut VMMemoryDefinition,
3048        start: *mut VMFuncRef,
3049        return_: *mut VMFuncRef,
3050        caller_instance: RuntimeComponentInstanceIndex,
3051        callee_instance: RuntimeComponentInstanceIndex,
3052        task_return_type: TypeTupleIndex,
3053        string_encoding: u8,
3054        result_count: u32,
3055        storage: *mut ValRaw,
3056        storage_len: usize,
3057    ) -> Result<()>;
3058
3059    /// A helper function for fused adapter modules involving calls where the
3060    /// caller is sync-lowered but the callee is async-lifted.
3061    unsafe fn sync_start(
3062        &mut self,
3063        instance: Instance,
3064        callback: *mut VMFuncRef,
3065        callee: *mut VMFuncRef,
3066        param_count: u32,
3067        storage: *mut MaybeUninit<ValRaw>,
3068        storage_len: usize,
3069    ) -> Result<()>;
3070
3071    /// A helper function for fused adapter modules involving calls where the
3072    /// caller is async-lowered.
3073    unsafe fn async_start(
3074        &mut self,
3075        instance: Instance,
3076        callback: *mut VMFuncRef,
3077        post_return: *mut VMFuncRef,
3078        callee: *mut VMFuncRef,
3079        param_count: u32,
3080        result_count: u32,
3081        flags: u32,
3082    ) -> Result<u32>;
3083
3084    /// The `future.write` intrinsic.
3085    fn future_write(
3086        &mut self,
3087        instance: Instance,
3088        caller: RuntimeComponentInstanceIndex,
3089        ty: TypeFutureTableIndex,
3090        options: OptionsIndex,
3091        future: u32,
3092        address: u32,
3093    ) -> Result<u32>;
3094
3095    /// The `future.read` intrinsic.
3096    fn future_read(
3097        &mut self,
3098        instance: Instance,
3099        caller: RuntimeComponentInstanceIndex,
3100        ty: TypeFutureTableIndex,
3101        options: OptionsIndex,
3102        future: u32,
3103        address: u32,
3104    ) -> Result<u32>;
3105
3106    /// The `future.drop-writable` intrinsic.
3107    fn future_drop_writable(
3108        &mut self,
3109        instance: Instance,
3110        caller: RuntimeComponentInstanceIndex,
3111        ty: TypeFutureTableIndex,
3112        writer: u32,
3113    ) -> Result<()>;
3114
3115    /// The `stream.write` intrinsic.
3116    fn stream_write(
3117        &mut self,
3118        instance: Instance,
3119        caller: RuntimeComponentInstanceIndex,
3120        ty: TypeStreamTableIndex,
3121        options: OptionsIndex,
3122        stream: u32,
3123        address: u32,
3124        count: u32,
3125    ) -> Result<u32>;
3126
3127    /// The `stream.read` intrinsic.
3128    fn stream_read(
3129        &mut self,
3130        instance: Instance,
3131        caller: RuntimeComponentInstanceIndex,
3132        ty: TypeStreamTableIndex,
3133        options: OptionsIndex,
3134        stream: u32,
3135        address: u32,
3136        count: u32,
3137    ) -> Result<u32>;
3138
3139    /// The "fast-path" implementation of the `stream.write` intrinsic for
3140    /// "flat" (i.e. memcpy-able) payloads.
3141    fn flat_stream_write(
3142        &mut self,
3143        instance: Instance,
3144        caller: RuntimeComponentInstanceIndex,
3145        ty: TypeStreamTableIndex,
3146        options: OptionsIndex,
3147        payload_size: u32,
3148        payload_align: u32,
3149        stream: u32,
3150        address: u32,
3151        count: u32,
3152    ) -> Result<u32>;
3153
3154    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3155    /// (i.e. memcpy-able) payloads.
3156    fn flat_stream_read(
3157        &mut self,
3158        instance: Instance,
3159        caller: RuntimeComponentInstanceIndex,
3160        ty: TypeStreamTableIndex,
3161        options: OptionsIndex,
3162        payload_size: u32,
3163        payload_align: u32,
3164        stream: u32,
3165        address: u32,
3166        count: u32,
3167    ) -> Result<u32>;
3168
3169    /// The `stream.drop-writable` intrinsic.
3170    fn stream_drop_writable(
3171        &mut self,
3172        instance: Instance,
3173        caller: RuntimeComponentInstanceIndex,
3174        ty: TypeStreamTableIndex,
3175        writer: u32,
3176    ) -> Result<()>;
3177
3178    /// The `error-context.debug-message` intrinsic.
3179    fn error_context_debug_message(
3180        &mut self,
3181        instance: Instance,
3182        caller: RuntimeComponentInstanceIndex,
3183        ty: TypeComponentLocalErrorContextTableIndex,
3184        options: OptionsIndex,
3185        err_ctx_handle: u32,
3186        debug_msg_address: u32,
3187    ) -> Result<()>;
3188}
3189
3190/// SAFETY: See trait docs.
3191impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3192    unsafe fn prepare_call(
3193        &mut self,
3194        instance: Instance,
3195        memory: *mut VMMemoryDefinition,
3196        start: *mut VMFuncRef,
3197        return_: *mut VMFuncRef,
3198        caller_instance: RuntimeComponentInstanceIndex,
3199        callee_instance: RuntimeComponentInstanceIndex,
3200        task_return_type: TypeTupleIndex,
3201        string_encoding: u8,
3202        result_count_or_max_if_async: u32,
3203        storage: *mut ValRaw,
3204        storage_len: usize,
3205    ) -> Result<()> {
3206        // SAFETY: The `wasmtime_cranelift`-generated code that calls
3207        // this method will have ensured that `storage` is a valid
3208        // pointer containing at least `storage_len` items.
3209        let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3210
3211        unsafe {
3212            instance.prepare_call(
3213                StoreContextMut(self),
3214                start,
3215                return_,
3216                caller_instance,
3217                callee_instance,
3218                task_return_type,
3219                memory,
3220                string_encoding,
3221                match result_count_or_max_if_async {
3222                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3223                        params,
3224                        has_result: false,
3225                    },
3226                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3227                        params,
3228                        has_result: true,
3229                    },
3230                    result_count => CallerInfo::Sync {
3231                        params,
3232                        result_count,
3233                    },
3234                },
3235            )
3236        }
3237    }
3238
3239    unsafe fn sync_start(
3240        &mut self,
3241        instance: Instance,
3242        callback: *mut VMFuncRef,
3243        callee: *mut VMFuncRef,
3244        param_count: u32,
3245        storage: *mut MaybeUninit<ValRaw>,
3246        storage_len: usize,
3247    ) -> Result<()> {
3248        unsafe {
3249            instance
3250                .start_call(
3251                    StoreContextMut(self),
3252                    callback,
3253                    ptr::null_mut(),
3254                    callee,
3255                    param_count,
3256                    1,
3257                    START_FLAG_ASYNC_CALLEE,
3258                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
3259                    // this method will have ensured that `storage` is a valid
3260                    // pointer containing at least `storage_len` items.
3261                    Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3262                )
3263                .map(drop)
3264        }
3265    }
3266
3267    unsafe fn async_start(
3268        &mut self,
3269        instance: Instance,
3270        callback: *mut VMFuncRef,
3271        post_return: *mut VMFuncRef,
3272        callee: *mut VMFuncRef,
3273        param_count: u32,
3274        result_count: u32,
3275        flags: u32,
3276    ) -> Result<u32> {
3277        unsafe {
3278            instance.start_call(
3279                StoreContextMut(self),
3280                callback,
3281                post_return,
3282                callee,
3283                param_count,
3284                result_count,
3285                flags,
3286                None,
3287            )
3288        }
3289    }
3290
3291    fn future_write(
3292        &mut self,
3293        instance: Instance,
3294        caller: RuntimeComponentInstanceIndex,
3295        ty: TypeFutureTableIndex,
3296        options: OptionsIndex,
3297        future: u32,
3298        address: u32,
3299    ) -> Result<u32> {
3300        instance.id().get(self).check_may_leave(caller)?;
3301        instance
3302            .guest_write(
3303                StoreContextMut(self),
3304                TransmitIndex::Future(ty),
3305                options,
3306                None,
3307                future,
3308                address,
3309                1,
3310            )
3311            .map(|result| result.encode())
3312    }
3313
3314    fn future_read(
3315        &mut self,
3316        instance: Instance,
3317        caller: RuntimeComponentInstanceIndex,
3318        ty: TypeFutureTableIndex,
3319        options: OptionsIndex,
3320        future: u32,
3321        address: u32,
3322    ) -> Result<u32> {
3323        instance.id().get(self).check_may_leave(caller)?;
3324        instance
3325            .guest_read(
3326                StoreContextMut(self),
3327                TransmitIndex::Future(ty),
3328                options,
3329                None,
3330                future,
3331                address,
3332                1,
3333            )
3334            .map(|result| result.encode())
3335    }
3336
3337    fn stream_write(
3338        &mut self,
3339        instance: Instance,
3340        caller: RuntimeComponentInstanceIndex,
3341        ty: TypeStreamTableIndex,
3342        options: OptionsIndex,
3343        stream: u32,
3344        address: u32,
3345        count: u32,
3346    ) -> Result<u32> {
3347        instance.id().get(self).check_may_leave(caller)?;
3348        instance
3349            .guest_write(
3350                StoreContextMut(self),
3351                TransmitIndex::Stream(ty),
3352                options,
3353                None,
3354                stream,
3355                address,
3356                count,
3357            )
3358            .map(|result| result.encode())
3359    }
3360
3361    fn stream_read(
3362        &mut self,
3363        instance: Instance,
3364        caller: RuntimeComponentInstanceIndex,
3365        ty: TypeStreamTableIndex,
3366        options: OptionsIndex,
3367        stream: u32,
3368        address: u32,
3369        count: u32,
3370    ) -> Result<u32> {
3371        instance.id().get(self).check_may_leave(caller)?;
3372        instance
3373            .guest_read(
3374                StoreContextMut(self),
3375                TransmitIndex::Stream(ty),
3376                options,
3377                None,
3378                stream,
3379                address,
3380                count,
3381            )
3382            .map(|result| result.encode())
3383    }
3384
3385    fn future_drop_writable(
3386        &mut self,
3387        instance: Instance,
3388        caller: RuntimeComponentInstanceIndex,
3389        ty: TypeFutureTableIndex,
3390        writer: u32,
3391    ) -> Result<()> {
3392        instance.id().get(self).check_may_leave(caller)?;
3393        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3394    }
3395
3396    fn flat_stream_write(
3397        &mut self,
3398        instance: Instance,
3399        caller: RuntimeComponentInstanceIndex,
3400        ty: TypeStreamTableIndex,
3401        options: OptionsIndex,
3402        payload_size: u32,
3403        payload_align: u32,
3404        stream: u32,
3405        address: u32,
3406        count: u32,
3407    ) -> Result<u32> {
3408        instance.id().get(self).check_may_leave(caller)?;
3409        instance
3410            .guest_write(
3411                StoreContextMut(self),
3412                TransmitIndex::Stream(ty),
3413                options,
3414                Some(FlatAbi {
3415                    size: payload_size,
3416                    align: payload_align,
3417                }),
3418                stream,
3419                address,
3420                count,
3421            )
3422            .map(|result| result.encode())
3423    }
3424
3425    fn flat_stream_read(
3426        &mut self,
3427        instance: Instance,
3428        caller: RuntimeComponentInstanceIndex,
3429        ty: TypeStreamTableIndex,
3430        options: OptionsIndex,
3431        payload_size: u32,
3432        payload_align: u32,
3433        stream: u32,
3434        address: u32,
3435        count: u32,
3436    ) -> Result<u32> {
3437        instance.id().get(self).check_may_leave(caller)?;
3438        instance
3439            .guest_read(
3440                StoreContextMut(self),
3441                TransmitIndex::Stream(ty),
3442                options,
3443                Some(FlatAbi {
3444                    size: payload_size,
3445                    align: payload_align,
3446                }),
3447                stream,
3448                address,
3449                count,
3450            )
3451            .map(|result| result.encode())
3452    }
3453
3454    fn stream_drop_writable(
3455        &mut self,
3456        instance: Instance,
3457        caller: RuntimeComponentInstanceIndex,
3458        ty: TypeStreamTableIndex,
3459        writer: u32,
3460    ) -> Result<()> {
3461        instance.id().get(self).check_may_leave(caller)?;
3462        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3463    }
3464
3465    fn error_context_debug_message(
3466        &mut self,
3467        instance: Instance,
3468        caller: RuntimeComponentInstanceIndex,
3469        ty: TypeComponentLocalErrorContextTableIndex,
3470        options: OptionsIndex,
3471        err_ctx_handle: u32,
3472        debug_msg_address: u32,
3473    ) -> Result<()> {
3474        instance.id().get(self).check_may_leave(caller)?;
3475        instance.error_context_debug_message(
3476            StoreContextMut(self),
3477            ty,
3478            options,
3479            err_ctx_handle,
3480            debug_msg_address,
3481        )
3482    }
3483}
3484
3485type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3486
3487/// Represents the state of a pending host task.
3488struct HostTask {
3489    common: WaitableCommon,
3490    caller_instance: RuntimeComponentInstanceIndex,
3491    join_handle: Option<JoinHandle>,
3492}
3493
3494impl HostTask {
3495    fn new(
3496        caller_instance: RuntimeComponentInstanceIndex,
3497        join_handle: Option<JoinHandle>,
3498    ) -> Self {
3499        Self {
3500            common: WaitableCommon::default(),
3501            caller_instance,
3502            join_handle,
3503        }
3504    }
3505}
3506
3507impl TableDebug for HostTask {
3508    fn type_name() -> &'static str {
3509        "HostTask"
3510    }
3511}
3512
3513type CallbackFn = Box<
3514    dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3515        + Send
3516        + Sync
3517        + 'static,
3518>;
3519
3520/// Represents the caller of a given guest task.
3521enum Caller {
3522    /// The host called the guest task.
3523    Host {
3524        /// If present, may be used to deliver the result.
3525        tx: Option<oneshot::Sender<LiftedResult>>,
3526        /// Channel to notify once all subtasks spawned by this caller have
3527        /// completed.
3528        ///
3529        /// Note that we'll never actually send anything to this channel;
3530        /// dropping it when the refcount goes to zero is sufficient to notify
3531        /// the receiver.
3532        exit_tx: Arc<oneshot::Sender<()>>,
3533        /// If true, remove the task from the concurrent state that owns it
3534        /// automatically after it completes.
3535        remove_task_automatically: bool,
3536        /// If true, call `post-return` function (if any) automatically.
3537        call_post_return_automatically: bool,
3538    },
3539    /// Another guest task called the guest task
3540    Guest {
3541        /// The id of the caller
3542        task: TableId<GuestTask>,
3543        /// The instance to use to enforce reentrance rules.
3544        ///
3545        /// Note that this might not be the same as the instance the caller task
3546        /// started executing in given that one or more synchronous guest->guest
3547        /// calls may have occurred involving multiple instances.
3548        instance: RuntimeComponentInstanceIndex,
3549    },
3550}
3551
3552/// Represents a closure and related canonical ABI parameters required to
3553/// validate a `task.return` call at runtime and lift the result.
3554struct LiftResult {
3555    lift: RawLift,
3556    ty: TypeTupleIndex,
3557    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
3558    string_encoding: StringEncoding,
3559}
3560
3561/// Represents a pending guest task.
3562struct GuestTask {
3563    /// See `WaitableCommon`
3564    common: WaitableCommon,
3565    /// Closure to lower the parameters passed to this task.
3566    lower_params: Option<RawLower>,
3567    /// See `LiftResult`
3568    lift_result: Option<LiftResult>,
3569    /// A place to stash the type-erased lifted result if it can't be delivered
3570    /// immediately.
3571    result: Option<LiftedResult>,
3572    /// Closure to call the callback function for an async-lifted export, if
3573    /// provided.
3574    callback: Option<CallbackFn>,
3575    /// See `Caller`
3576    caller: Caller,
3577    /// A place to stash the call context for managing resource borrows while
3578    /// switching between guest tasks.
3579    call_context: Option<CallContext>,
3580    /// A place to stash the lowered result for a sync-to-async call until it
3581    /// can be returned to the caller.
3582    sync_result: Option<Option<ValRaw>>,
3583    /// Whether or not the task has been cancelled (i.e. whether the task is
3584    /// permitted to call `task.cancel`).
3585    cancel_sent: bool,
3586    /// Whether or not we've sent a `Status::Starting` event to any current or
3587    /// future waiters for this waitable.
3588    starting_sent: bool,
3589    /// Context-local state used to implement the `context.{get,set}`
3590    /// intrinsics.
3591    context: [u32; 2],
3592    /// Pending guest subtasks created by this task (directly or indirectly).
3593    ///
3594    /// This is used to re-parent subtasks which are still running when their
3595    /// parent task is disposed.
3596    subtasks: HashSet<TableId<GuestTask>>,
3597    /// Scratch waitable set used to watch subtasks during synchronous calls.
3598    sync_call_set: TableId<WaitableSet>,
3599    /// The instance to which the exported function for this guest task belongs.
3600    ///
3601    /// Note that the task may do a sync->sync call via a fused adapter which
3602    /// results in that task executing code in a different instance, and it may
3603    /// call host functions and intrinsics from that other instance.
3604    instance: RuntimeComponentInstanceIndex,
3605    /// If present, a pending `Event::None` or `Event::Cancelled` to be
3606    /// delivered to this task.
3607    event: Option<Event>,
3608    /// If present, indicates that the task is currently waiting on the
3609    /// specified set but may be cancelled and woken immediately.
3610    wake_on_cancel: Option<TableId<WaitableSet>>,
3611    /// The `ExportIndex` of the guest function being called, if known.
3612    function_index: Option<ExportIndex>,
3613    /// Whether or not the task has exited.
3614    exited: bool,
3615}
3616
3617impl GuestTask {
3618    fn new(
3619        state: &mut ConcurrentState,
3620        lower_params: RawLower,
3621        lift_result: LiftResult,
3622        caller: Caller,
3623        callback: Option<CallbackFn>,
3624        component_instance: RuntimeComponentInstanceIndex,
3625    ) -> Result<Self> {
3626        let sync_call_set = state.push(WaitableSet::default())?;
3627
3628        Ok(Self {
3629            common: WaitableCommon::default(),
3630            lower_params: Some(lower_params),
3631            lift_result: Some(lift_result),
3632            result: None,
3633            callback,
3634            caller,
3635            call_context: Some(CallContext::default()),
3636            sync_result: None,
3637            cancel_sent: false,
3638            starting_sent: false,
3639            context: [0u32; 2],
3640            subtasks: HashSet::new(),
3641            sync_call_set,
3642            instance: component_instance,
3643            event: None,
3644            wake_on_cancel: None,
3645            function_index: None,
3646            exited: false,
3647        })
3648    }
3649
3650    /// Dispose of this guest task, reparenting any pending subtasks to the
3651    /// caller.
3652    fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
3653        // If there are not-yet-delivered completion events for subtasks in
3654        // `self.sync_call_set`, recursively dispose of those subtasks as well.
3655        for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
3656            if let Some(Event::Subtask {
3657                status: Status::Returned | Status::ReturnCancelled,
3658            }) = waitable.common(state)?.event
3659            {
3660                waitable.delete_from(state)?;
3661            }
3662        }
3663
3664        state.delete(self.sync_call_set)?;
3665
3666        // Reparent any pending subtasks to the caller.
3667        match &self.caller {
3668            Caller::Guest {
3669                task,
3670                instance: runtime_instance,
3671            } => {
3672                let task_mut = state.get_mut(*task)?;
3673                let present = task_mut.subtasks.remove(&me);
3674                assert!(present);
3675
3676                for subtask in &self.subtasks {
3677                    task_mut.subtasks.insert(*subtask);
3678                }
3679
3680                for subtask in &self.subtasks {
3681                    state.get_mut(*subtask)?.caller = Caller::Guest {
3682                        task: *task,
3683                        instance: *runtime_instance,
3684                    };
3685                }
3686            }
3687            Caller::Host { exit_tx, .. } => {
3688                for subtask in &self.subtasks {
3689                    state.get_mut(*subtask)?.caller = Caller::Host {
3690                        tx: None,
3691                        // Clone `exit_tx` to ensure that it is only dropped
3692                        // once all transitive subtasks of the host call have
3693                        // exited:
3694                        exit_tx: exit_tx.clone(),
3695                        remove_task_automatically: true,
3696                        call_post_return_automatically: true,
3697                    };
3698                }
3699            }
3700        }
3701
3702        for subtask in self.subtasks {
3703            if state.get_mut(subtask)?.exited {
3704                Waitable::Guest(subtask).delete_from(state)?;
3705            }
3706        }
3707
3708        Ok(())
3709    }
3710
3711    fn call_post_return_automatically(&self) -> bool {
3712        matches!(
3713            self.caller,
3714            Caller::Guest { .. }
3715                | Caller::Host {
3716                    call_post_return_automatically: true,
3717                    ..
3718                }
3719        )
3720    }
3721}
3722
3723impl TableDebug for GuestTask {
3724    fn type_name() -> &'static str {
3725        "GuestTask"
3726    }
3727}
3728
3729/// Represents state common to all kinds of waitables.
3730#[derive(Default)]
3731struct WaitableCommon {
3732    /// The currently pending event for this waitable, if any.
3733    event: Option<Event>,
3734    /// The set to which this waitable belongs, if any.
3735    set: Option<TableId<WaitableSet>>,
3736    /// The handle with which the guest refers to this waitable, if any.
3737    handle: Option<u32>,
3738}
3739
3740/// Represents a Component Model Async `waitable`.
3741#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
3742enum Waitable {
3743    /// A host task
3744    Host(TableId<HostTask>),
3745    /// A guest task
3746    Guest(TableId<GuestTask>),
3747    /// The read or write end of a stream or future
3748    Transmit(TableId<TransmitHandle>),
3749}
3750
3751impl Waitable {
3752    /// Retrieve the `Waitable` corresponding to the specified guest-visible
3753    /// handle.
3754    fn from_instance(
3755        state: Pin<&mut ComponentInstance>,
3756        caller_instance: RuntimeComponentInstanceIndex,
3757        waitable: u32,
3758    ) -> Result<Self> {
3759        use crate::runtime::vm::component::Waitable;
3760
3761        let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
3762
3763        Ok(match kind {
3764            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
3765            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
3766            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
3767        })
3768    }
3769
3770    /// Retrieve the host-visible identifier for this `Waitable`.
3771    fn rep(&self) -> u32 {
3772        match self {
3773            Self::Host(id) => id.rep(),
3774            Self::Guest(id) => id.rep(),
3775            Self::Transmit(id) => id.rep(),
3776        }
3777    }
3778
3779    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
3780    /// remove it from any set it may currently belong to (when `set` is
3781    /// `None`).
3782    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
3783        log::trace!("waitable {self:?} join set {set:?}",);
3784
3785        let old = mem::replace(&mut self.common(state)?.set, set);
3786
3787        if let Some(old) = old {
3788            match *self {
3789                Waitable::Host(id) => state.remove_child(id, old),
3790                Waitable::Guest(id) => state.remove_child(id, old),
3791                Waitable::Transmit(id) => state.remove_child(id, old),
3792            }?;
3793
3794            state.get_mut(old)?.ready.remove(self);
3795        }
3796
3797        if let Some(set) = set {
3798            match *self {
3799                Waitable::Host(id) => state.add_child(id, set),
3800                Waitable::Guest(id) => state.add_child(id, set),
3801                Waitable::Transmit(id) => state.add_child(id, set),
3802            }?;
3803
3804            if self.common(state)?.event.is_some() {
3805                self.mark_ready(state)?;
3806            }
3807        }
3808
3809        Ok(())
3810    }
3811
3812    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
3813    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
3814        Ok(match self {
3815            Self::Host(id) => &mut state.get_mut(*id)?.common,
3816            Self::Guest(id) => &mut state.get_mut(*id)?.common,
3817            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
3818        })
3819    }
3820
3821    /// Set or clear the pending event for this waitable and either deliver it
3822    /// to the first waiter, if any, or mark it as ready to be delivered to the
3823    /// next waiter that arrives.
3824    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
3825        log::trace!("set event for {self:?}: {event:?}");
3826        self.common(state)?.event = event;
3827        self.mark_ready(state)
3828    }
3829
3830    /// Take the pending event from this waitable, leaving `None` in its place.
3831    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
3832        let common = self.common(state)?;
3833        let event = common.event.take();
3834        if let Some(set) = self.common(state)?.set {
3835            state.get_mut(set)?.ready.remove(self);
3836        }
3837        Ok(event)
3838    }
3839
3840    /// Deliver the current event for this waitable to the first waiter, if any,
3841    /// or else mark it as ready to be delivered to the next waiter that
3842    /// arrives.
3843    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
3844        if let Some(set) = self.common(state)?.set {
3845            state.get_mut(set)?.ready.insert(*self);
3846            if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
3847                let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
3848                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
3849
3850                let item = match mode {
3851                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3852                    WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3853                        task,
3854                        kind: GuestCallKind::DeliverEvent {
3855                            instance,
3856                            set: Some(set),
3857                        },
3858                    }),
3859                };
3860                state.push_high_priority(item);
3861            }
3862        }
3863        Ok(())
3864    }
3865
3866    /// Handle the imminent delivery of the specified event, e.g. by updating
3867    /// the state of the stream or future.
3868    fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
3869        match event {
3870            Event::FutureRead {
3871                pending: Some((ty, handle)),
3872                ..
3873            }
3874            | Event::FutureWrite {
3875                pending: Some((ty, handle)),
3876                ..
3877            } => {
3878                let runtime_instance = instance.component().types()[ty].instance;
3879                let (rep, state) = instance.guest_tables().0[runtime_instance]
3880                    .future_rep(ty, handle)
3881                    .unwrap();
3882                assert_eq!(rep, self.rep());
3883                assert_eq!(*state, TransmitLocalState::Busy);
3884                *state = match event {
3885                    Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
3886                    Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
3887                    _ => unreachable!(),
3888                };
3889            }
3890            Event::StreamRead {
3891                pending: Some((ty, handle)),
3892                code,
3893            }
3894            | Event::StreamWrite {
3895                pending: Some((ty, handle)),
3896                code,
3897            } => {
3898                let runtime_instance = instance.component().types()[ty].instance;
3899                let (rep, state) = instance.guest_tables().0[runtime_instance]
3900                    .stream_rep(ty, handle)
3901                    .unwrap();
3902                assert_eq!(rep, self.rep());
3903                assert_eq!(*state, TransmitLocalState::Busy);
3904                let done = matches!(code, ReturnCode::Dropped(_));
3905                *state = match event {
3906                    Event::StreamRead { .. } => TransmitLocalState::Read { done },
3907                    Event::StreamWrite { .. } => TransmitLocalState::Write { done },
3908                    _ => unreachable!(),
3909                };
3910            }
3911            _ => {}
3912        }
3913    }
3914
3915    /// Remove this waitable from the instance's rep table.
3916    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
3917        match self {
3918            Self::Host(task) => {
3919                log::trace!("delete host task {task:?}");
3920                state.delete(*task)?;
3921            }
3922            Self::Guest(task) => {
3923                log::trace!("delete guest task {task:?}");
3924                state.delete(*task)?.dispose(state, *task)?;
3925            }
3926            Self::Transmit(task) => {
3927                state.delete(*task)?;
3928            }
3929        }
3930
3931        Ok(())
3932    }
3933}
3934
3935impl fmt::Debug for Waitable {
3936    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3937        match self {
3938            Self::Host(id) => write!(f, "{id:?}"),
3939            Self::Guest(id) => write!(f, "{id:?}"),
3940            Self::Transmit(id) => write!(f, "{id:?}"),
3941        }
3942    }
3943}
3944
3945/// Represents a Component Model Async `waitable-set`.
3946#[derive(Default)]
3947struct WaitableSet {
3948    /// Which waitables in this set have pending events, if any.
3949    ready: BTreeSet<Waitable>,
3950    /// Which guest tasks are currently waiting on this set, if any.
3951    waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
3952}
3953
3954impl TableDebug for WaitableSet {
3955    fn type_name() -> &'static str {
3956        "WaitableSet"
3957    }
3958}
3959
3960/// Type-erased closure to lower the parameters for a guest task.
3961type RawLower =
3962    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
3963
3964/// Type-erased closure to lift the result for a guest task.
3965type RawLift = Box<
3966    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
3967>;
3968
3969/// Type erased result of a guest task which may be downcast to the expected
3970/// type by a host caller (or simply ignored in the case of a guest caller; see
3971/// `DummyResult`).
3972type LiftedResult = Box<dyn Any + Send + Sync>;
3973
3974/// Used to return a result from a `LiftFn` when the actual result has already
3975/// been lowered to a guest task's stack and linear memory.
3976struct DummyResult;
3977
3978/// Represents the Component Model Async state of a (sub-)component instance.
3979#[derive(Default)]
3980struct InstanceState {
3981    /// Whether backpressure is set for this instance (enabled if >0)
3982    backpressure: u16,
3983    /// Whether this instance can be entered
3984    do_not_enter: bool,
3985    /// Pending calls for this instance which require `Self::backpressure` to be
3986    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
3987    pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
3988}
3989
3990/// Represents the Component Model Async state of a store.
3991pub struct ConcurrentState {
3992    /// The currently running guest task, if any.
3993    guest_task: Option<TableId<GuestTask>>,
3994    /// The set of pending host and background tasks, if any.
3995    ///
3996    /// See `ComponentInstance::poll_until` for where we temporarily take this
3997    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
3998    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
3999    /// The table of waitables, waitable sets, etc.
4000    table: AlwaysMut<ResourceTable>,
4001    /// Per (sub-)component instance states.
4002    ///
4003    /// See `InstanceState` for details and note that this map is lazily
4004    /// populated as needed.
4005    // TODO: this can and should be a `PrimaryMap`
4006    instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4007    /// The "high priority" work queue for this instance's event loop.
4008    high_priority: Vec<WorkItem>,
4009    /// The "high priority" work queue for this instance's event loop.
4010    low_priority: Vec<WorkItem>,
4011    /// A place to stash the reason a fiber is suspending so that the code which
4012    /// resumed it will know under what conditions the fiber should be resumed
4013    /// again.
4014    suspend_reason: Option<SuspendReason>,
4015    /// A cached fiber which is waiting for work to do.
4016    ///
4017    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4018    worker: Option<StoreFiber<'static>>,
4019    /// A place to stash the work item for which we're resuming a worker fiber.
4020    worker_item: Option<WorkerItem>,
4021
4022    /// Reference counts for all component error contexts
4023    ///
4024    /// NOTE: it is possible the global ref count to be *greater* than the sum of
4025    /// (sub)component ref counts as tracked by `error_context_tables`, for
4026    /// example when the host holds one or more references to error contexts.
4027    ///
4028    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4029    /// component-wide representation) of the index into concurrent state for a given
4030    /// stored `ErrorContext`.
4031    ///
4032    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4033    /// as a `TableId<ErrorContextState>`.
4034    global_error_context_ref_counts:
4035        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4036}
4037
4038impl Default for ConcurrentState {
4039    fn default() -> Self {
4040        Self {
4041            guest_task: None,
4042            table: AlwaysMut::new(ResourceTable::new()),
4043            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4044            instance_states: HashMap::new(),
4045            high_priority: Vec::new(),
4046            low_priority: Vec::new(),
4047            suspend_reason: None,
4048            worker: None,
4049            worker_item: None,
4050            global_error_context_ref_counts: BTreeMap::new(),
4051        }
4052    }
4053}
4054
4055impl ConcurrentState {
4056    /// Take ownership of any fibers and futures owned by this object.
4057    ///
4058    /// This should be used when disposing of the `Store` containing this object
4059    /// in order to gracefully resolve any and all fibers using
4060    /// `StoreFiber::dispose`.  This is necessary to avoid possible
4061    /// use-after-free bugs due to fibers which may still have access to the
4062    /// `Store`.
4063    ///
4064    /// Additionally, the futures collected with this function should be dropped
4065    /// within a `tls::set` call, which will ensure than any futures closing
4066    /// over an `&Accessor` will have access to the store when dropped, allowing
4067    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4068    /// panicking.
4069    ///
4070    /// Note that this will leave the object in an inconsistent and unusable
4071    /// state, so it should only be used just prior to dropping it.
4072    pub(crate) fn take_fibers_and_futures(
4073        &mut self,
4074        fibers: &mut Vec<StoreFiber<'static>>,
4075        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4076    ) {
4077        for entry in self.table.get_mut().iter_mut() {
4078            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4079                for mode in mem::take(&mut set.waiting).into_values() {
4080                    if let WaitMode::Fiber(fiber) = mode {
4081                        fibers.push(fiber);
4082                    }
4083                }
4084            }
4085        }
4086
4087        if let Some(fiber) = self.worker.take() {
4088            fibers.push(fiber);
4089        }
4090
4091        let mut take_items = |list| {
4092            for item in mem::take(list) {
4093                match item {
4094                    WorkItem::ResumeFiber(fiber) => {
4095                        fibers.push(fiber);
4096                    }
4097                    WorkItem::PushFuture(future) => {
4098                        self.futures
4099                            .get_mut()
4100                            .as_mut()
4101                            .unwrap()
4102                            .push(future.into_inner());
4103                    }
4104                    _ => {}
4105                }
4106            }
4107        };
4108
4109        take_items(&mut self.high_priority);
4110        take_items(&mut self.low_priority);
4111
4112        if let Some(them) = self.futures.get_mut().take() {
4113            futures.push(them);
4114        }
4115    }
4116
4117    fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4118        self.instance_states.entry(instance).or_default()
4119    }
4120
4121    fn push<V: Send + Sync + 'static>(
4122        &mut self,
4123        value: V,
4124    ) -> Result<TableId<V>, ResourceTableError> {
4125        self.table.get_mut().push(value).map(TableId::from)
4126    }
4127
4128    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4129        self.table.get_mut().get_mut(&Resource::from(id))
4130    }
4131
4132    pub fn add_child<T: 'static, U: 'static>(
4133        &mut self,
4134        child: TableId<T>,
4135        parent: TableId<U>,
4136    ) -> Result<(), ResourceTableError> {
4137        self.table
4138            .get_mut()
4139            .add_child(Resource::from(child), Resource::from(parent))
4140    }
4141
4142    pub fn remove_child<T: 'static, U: 'static>(
4143        &mut self,
4144        child: TableId<T>,
4145        parent: TableId<U>,
4146    ) -> Result<(), ResourceTableError> {
4147        self.table
4148            .get_mut()
4149            .remove_child(Resource::from(child), Resource::from(parent))
4150    }
4151
4152    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4153        self.table.get_mut().delete(Resource::from(id))
4154    }
4155
4156    fn push_future(&mut self, future: HostTaskFuture) {
4157        // Note that we can't directly push to `ConcurrentState::futures` here
4158        // since this may be called from a future that's being polled inside
4159        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4160        // so it has exclusive access while polling it.  Therefore, we push a
4161        // work item to the "high priority" queue, which will actually push to
4162        // `ConcurrentState::futures` later.
4163        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4164    }
4165
4166    fn push_high_priority(&mut self, item: WorkItem) {
4167        log::trace!("push high priority: {item:?}");
4168        self.high_priority.push(item);
4169    }
4170
4171    fn push_low_priority(&mut self, item: WorkItem) {
4172        log::trace!("push low priority: {item:?}");
4173        self.low_priority.push(item);
4174    }
4175
4176    /// Determine whether the instance associated with the specified guest task
4177    /// may be entered (i.e. is not already on the async call stack).
4178    ///
4179    /// This is an additional check on top of the "may_enter" instance flag;
4180    /// it's needed because async-lifted exports with callback functions must
4181    /// not call their own instances directly or indirectly, and due to the
4182    /// "stackless" nature of callback-enabled guest tasks this may happen even
4183    /// if there are no activation records on the stack (i.e. the "may_enter"
4184    /// field is `true`) for that instance.
4185    fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4186        let guest_instance = self.get_mut(guest_task).unwrap().instance;
4187
4188        // Walk the task tree back to the root, looking for potential
4189        // reentrance.
4190        //
4191        // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4192        // such that each bit represents and instance which has been entered by
4193        // that task or an ancestor of that task, in which case this would be a
4194        // constant time check.
4195        loop {
4196            match &self.get_mut(guest_task).unwrap().caller {
4197                Caller::Host { .. } => break true,
4198                Caller::Guest { task, instance } => {
4199                    if *instance == guest_instance {
4200                        break false;
4201                    } else {
4202                        guest_task = *task;
4203                    }
4204                }
4205            }
4206        }
4207    }
4208
4209    /// Record that we're about to enter a (sub-)component instance which does
4210    /// not support more than one concurrent, stackful activation, meaning it
4211    /// cannot be entered again until the next call returns.
4212    fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4213        self.instance_state(instance).do_not_enter = true;
4214    }
4215
4216    /// Record that we've exited a (sub-)component instance previously entered
4217    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4218    /// See the documentation for the latter for details.
4219    fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4220        self.instance_state(instance).do_not_enter = false;
4221        self.partition_pending(instance)
4222    }
4223
4224    /// Iterate over `InstanceState::pending`, moving any ready items into the
4225    /// "high priority" work item queue.
4226    ///
4227    /// See `GuestCall::is_ready` for details.
4228    fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4229        for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4230            let call = GuestCall { task, kind };
4231            if call.is_ready(self)? {
4232                self.push_high_priority(WorkItem::GuestCall(call));
4233            } else {
4234                self.instance_state(instance)
4235                    .pending
4236                    .insert(call.task, call.kind);
4237            }
4238        }
4239
4240        Ok(())
4241    }
4242
4243    /// Implements the `backpressure.{set,inc,dec}` intrinsics.
4244    pub(crate) fn backpressure_modify(
4245        &mut self,
4246        caller_instance: RuntimeComponentInstanceIndex,
4247        modify: impl FnOnce(u16) -> Option<u16>,
4248    ) -> Result<()> {
4249        let state = self.instance_state(caller_instance);
4250        let old = state.backpressure;
4251        let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4252        state.backpressure = new;
4253
4254        if old > 0 && new == 0 {
4255            // Backpressure was previously enabled and is now disabled; move any
4256            // newly-eligible guest calls to the "high priority" queue.
4257            self.partition_pending(caller_instance)?;
4258        }
4259
4260        Ok(())
4261    }
4262
4263    /// Implements the `context.get` intrinsic.
4264    pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4265        let task = self.guest_task.unwrap();
4266        let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()];
4267        log::trace!("context_get {task:?} slot {slot} val {val:#x}");
4268        Ok(val)
4269    }
4270
4271    /// Implements the `context.set` intrinsic.
4272    pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4273        let task = self.guest_task.unwrap();
4274        log::trace!("context_set {task:?} slot {slot} val {val:#x}");
4275        self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
4276        Ok(())
4277    }
4278}
4279
4280/// Provide a type hint to compiler about the shape of a parameter lower
4281/// closure.
4282fn for_any_lower<
4283    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4284>(
4285    fun: F,
4286) -> F {
4287    fun
4288}
4289
4290/// Provide a type hint to compiler about the shape of a result lift closure.
4291fn for_any_lift<
4292    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4293>(
4294    fun: F,
4295) -> F {
4296    fun
4297}
4298
4299/// Wrap the specified future in a `poll_fn` which asserts that the future is
4300/// only polled from the event loop of the specified `Store`.
4301///
4302/// See `StoreContextMut::run_concurrent` for details.
4303fn checked<F: Future + Send + 'static>(
4304    id: StoreId,
4305    fut: F,
4306) -> impl Future<Output = F::Output> + Send + 'static {
4307    async move {
4308        let mut fut = pin!(fut);
4309        future::poll_fn(move |cx| {
4310            let message = "\
4311                `Future`s which depend on asynchronous component tasks, streams, or \
4312                futures to complete may only be polled from the event loop of the \
4313                store to which they belong.  Please use \
4314                `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4315            ";
4316            tls::try_get(|store| {
4317                let matched = match store {
4318                    tls::TryGet::Some(store) => store.id() == id,
4319                    tls::TryGet::Taken | tls::TryGet::None => false,
4320                };
4321
4322                if !matched {
4323                    panic!("{message}")
4324                }
4325            });
4326            fut.as_mut().poll(cx)
4327        })
4328        .await
4329    }
4330}
4331
4332/// Assert that `StoreContextMut::run_concurrent` has not been called from
4333/// within an store's event loop.
4334fn check_recursive_run() {
4335    tls::try_get(|store| {
4336        if !matches!(store, tls::TryGet::None) {
4337            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4338        }
4339    });
4340}
4341
4342fn unpack_callback_code(code: u32) -> (u32, u32) {
4343    (code & 0xF, code >> 4)
4344}
4345
4346/// Helper struct for packaging parameters to be passed to
4347/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4348/// `waitable-set.poll`.
4349struct WaitableCheckParams {
4350    set: TableId<WaitableSet>,
4351    options: OptionsIndex,
4352    payload: u32,
4353}
4354
4355/// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4356enum WaitableCheck {
4357    Wait(WaitableCheckParams),
4358    Poll(WaitableCheckParams),
4359    Yield,
4360}
4361
4362/// Represents a guest task called from the host, prepared using `prepare_call`.
4363pub(crate) struct PreparedCall<R> {
4364    /// The guest export to be called
4365    handle: Func,
4366    /// The guest task created by `prepare_call`
4367    task: TableId<GuestTask>,
4368    /// The number of lowered core Wasm parameters to pass to the call.
4369    param_count: usize,
4370    /// The `oneshot::Receiver` to which the result of the call will be
4371    /// delivered when it is available.
4372    rx: oneshot::Receiver<LiftedResult>,
4373    /// The `oneshot::Receiver` which will resolve when the task -- and any
4374    /// transitive subtasks -- have all exited.
4375    exit_rx: oneshot::Receiver<()>,
4376    _phantom: PhantomData<R>,
4377}
4378
4379impl<R> PreparedCall<R> {
4380    /// Get a copy of the `TaskId` for this `PreparedCall`.
4381    pub(crate) fn task_id(&self) -> TaskId {
4382        TaskId { task: self.task }
4383    }
4384}
4385
4386/// Represents a task created by `prepare_call`.
4387pub(crate) struct TaskId {
4388    task: TableId<GuestTask>,
4389}
4390
4391impl TaskId {
4392    /// Remove the specified task from the concurrent state to which it belongs.
4393    ///
4394    /// This must be used with care to avoid use-after-delete or double-delete
4395    /// bugs.  Specifically, it should only be called on tasks created with the
4396    /// `remove_task_automatically` parameter to `prepare_call` set to `false`,
4397    /// which tells the runtime that the caller is responsible for removing the
4398    /// task from the state; otherwise, it will be removed automatically.  Also,
4399    /// it should only be called once for a given task, and only after either
4400    /// the task has completed or the instance has trapped.
4401    pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
4402        Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())
4403    }
4404}
4405
4406/// Prepare a call to the specified exported Wasm function, providing functions
4407/// for lowering the parameters and lifting the result.
4408///
4409/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
4410/// loop, use `queue_call`.
4411pub(crate) fn prepare_call<T, R>(
4412    mut store: StoreContextMut<T>,
4413    handle: Func,
4414    param_count: usize,
4415    remove_task_automatically: bool,
4416    call_post_return_automatically: bool,
4417    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
4418    + Send
4419    + Sync
4420    + 'static,
4421    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
4422    + Send
4423    + Sync
4424    + 'static,
4425) -> Result<PreparedCall<R>> {
4426    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
4427
4428    let instance = handle.instance().id().get(store.0);
4429    let task_return_type = instance.component().types()[ty].results;
4430    let component_instance = raw_options.instance;
4431    let callback = options.callback();
4432    let memory = options.memory_raw().map(SendSyncPtr::new);
4433    let string_encoding = options.string_encoding();
4434    let token = StoreToken::new(store.as_context_mut());
4435    let state = store.0.concurrent_state_mut();
4436
4437    assert!(state.guest_task.is_none());
4438
4439    let (tx, rx) = oneshot::channel();
4440    let (exit_tx, exit_rx) = oneshot::channel();
4441
4442    let mut task = GuestTask::new(
4443        state,
4444        Box::new(for_any_lower(move |store, params| {
4445            lower_params(handle, token.as_context_mut(store), params)
4446        })),
4447        LiftResult {
4448            lift: Box::new(for_any_lift(move |store, result| {
4449                lift_result(handle, store, result)
4450            })),
4451            ty: task_return_type,
4452            memory,
4453            string_encoding,
4454        },
4455        Caller::Host {
4456            tx: Some(tx),
4457            exit_tx: Arc::new(exit_tx),
4458            remove_task_automatically,
4459            call_post_return_automatically,
4460        },
4461        callback.map(|callback| {
4462            let callback = SendSyncPtr::new(callback);
4463            let instance = handle.instance();
4464            Box::new(
4465                move |store: &mut dyn VMStore, runtime_instance, event, handle| {
4466                    let store = token.as_context_mut(store);
4467                    // SAFETY: Per the contract of `prepare_call`, the callback
4468                    // will remain valid at least as long is this task exists.
4469                    unsafe {
4470                        instance.call_callback(
4471                            store,
4472                            runtime_instance,
4473                            callback,
4474                            event,
4475                            handle,
4476                            call_post_return_automatically,
4477                        )
4478                    }
4479                },
4480            ) as CallbackFn
4481        }),
4482        component_instance,
4483    )?;
4484    task.function_index = Some(handle.index());
4485
4486    let task = state.push(task)?;
4487
4488    Ok(PreparedCall {
4489        handle,
4490        task,
4491        param_count,
4492        rx,
4493        exit_rx,
4494        _phantom: PhantomData,
4495    })
4496}
4497
4498/// Queue a call previously prepared using `prepare_call` to be run as part of
4499/// the associated `ComponentInstance`'s event loop.
4500///
4501/// The returned future will resolve to the result once it is available, but
4502/// must only be polled via the instance's event loop. See
4503/// `StoreContextMut::run_concurrent` for details.
4504pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
4505    mut store: StoreContextMut<T>,
4506    prepared: PreparedCall<R>,
4507) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
4508    let PreparedCall {
4509        handle,
4510        task,
4511        param_count,
4512        rx,
4513        exit_rx,
4514        ..
4515    } = prepared;
4516
4517    queue_call0(store.as_context_mut(), handle, task, param_count)?;
4518
4519    Ok(checked(
4520        store.0.id(),
4521        rx.map(move |result| {
4522            result
4523                .map(|v| (*v.downcast().unwrap(), exit_rx))
4524                .map_err(anyhow::Error::from)
4525        }),
4526    ))
4527}
4528
4529/// Queue a call previously prepared using `prepare_call` to be run as part of
4530/// the associated `ComponentInstance`'s event loop.
4531fn queue_call0<T: 'static>(
4532    store: StoreContextMut<T>,
4533    handle: Func,
4534    guest_task: TableId<GuestTask>,
4535    param_count: usize,
4536) -> Result<()> {
4537    let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
4538    let is_concurrent = raw_options.async_;
4539    let instance = handle.instance();
4540    let callee = handle.lifted_core_func(store.0);
4541    let callback = options.callback();
4542    let post_return = handle.post_return_core_func(store.0);
4543
4544    log::trace!("queueing call {guest_task:?}");
4545
4546    let instance_flags = if callback.is_none() {
4547        None
4548    } else {
4549        Some(flags)
4550    };
4551
4552    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
4553    // (with signatures appropriate for this call) and will remain valid as
4554    // long as this instance is valid.
4555    unsafe {
4556        instance.queue_call(
4557            store,
4558            guest_task,
4559            SendSyncPtr::new(callee),
4560            param_count,
4561            1,
4562            instance_flags,
4563            is_concurrent,
4564            callback.map(SendSyncPtr::new),
4565            post_return.map(SendSyncPtr::new),
4566        )
4567    }
4568}