Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMLazyThread, VMMemoryDefinition, VMStore};
69use crate::{
70    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119/// Constant defined in the Component Model spec to indicate that the async
120/// intrinsic (e.g. `future.write`) has not yet completed.
121const BLOCKED: u32 = 0xffff_ffff;
122
123/// Corresponds to `CallState` in the upstream spec.
124#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126    Starting = 0,
127    Started = 1,
128    Returned = 2,
129    StartCancelled = 3,
130    ReturnCancelled = 4,
131}
132
133impl Status {
134    /// Packs this status and the optional `waitable` provided into a 32-bit
135    /// result that the canonical ABI requires.
136    ///
137    /// The low 4 bits are reserved for the status while the upper 28 bits are
138    /// the waitable, if present.
139    pub fn pack(self, waitable: Option<u32>) -> u32 {
140        assert!(matches!(self, Status::Returned) == waitable.is_none());
141        let waitable = waitable.unwrap_or(0);
142        assert!(waitable < (1 << 28));
143        (waitable << 4) | (self as u32)
144    }
145}
146
147/// Corresponds to `EventCode` in the Component Model spec, plus related payload
148/// data.
149#[derive(Clone, Copy, Debug)]
150enum Event {
151    None,
152    Subtask {
153        status: Status,
154    },
155    StreamRead {
156        code: ReturnCode,
157        pending: Option<(TypeStreamTableIndex, u32)>,
158    },
159    StreamWrite {
160        code: ReturnCode,
161        pending: Option<(TypeStreamTableIndex, u32)>,
162    },
163    FutureRead {
164        code: ReturnCode,
165        pending: Option<(TypeFutureTableIndex, u32)>,
166    },
167    FutureWrite {
168        code: ReturnCode,
169        pending: Option<(TypeFutureTableIndex, u32)>,
170    },
171    Cancelled,
172}
173
174impl Event {
175    /// Lower this event to core Wasm integers for delivery to the guest.
176    ///
177    /// Note that the waitable handle, if any, is assumed to be lowered
178    /// separately.
179    fn parts(self) -> (u32, u32) {
180        const EVENT_NONE: u32 = 0;
181        const EVENT_SUBTASK: u32 = 1;
182        const EVENT_STREAM_READ: u32 = 2;
183        const EVENT_STREAM_WRITE: u32 = 3;
184        const EVENT_FUTURE_READ: u32 = 4;
185        const EVENT_FUTURE_WRITE: u32 = 5;
186        const EVENT_CANCELLED: u32 = 6;
187        match self {
188            Event::None => (EVENT_NONE, 0),
189            Event::Cancelled => (EVENT_CANCELLED, 0),
190            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195        }
196    }
197}
198
199/// Corresponds to `CallbackCode` in the spec.
200mod callback_code {
201    pub const EXIT: u32 = 0;
202    pub const YIELD: u32 = 1;
203    pub const WAIT: u32 = 2;
204}
205
206/// A flag indicating that the callee is an async-lowered export.
207///
208/// This may be passed to the `async-start` intrinsic from a fused adapter.
209const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211/// Provides access to either store data (via the `get` method) or the store
212/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
213/// instance to which the current host task belongs.
214///
215/// See [`Accessor::with`] for details.
216pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217    store: StoreContextMut<'a, T>,
218    get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223    D: HasData + ?Sized,
224    T: 'static,
225{
226    /// Creates a new [`Access`] from its component parts.
227    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228        Self { store, get_data }
229    }
230
231    /// Get mutable access to the store data.
232    pub fn data_mut(&mut self) -> &mut T {
233        self.store.data_mut()
234    }
235
236    /// Get mutable access to the store data.
237    pub fn get(&mut self) -> D::Data<'_> {
238        (self.get_data)(self.data_mut())
239    }
240
241    /// Spawn a background task.
242    ///
243    /// See [`Accessor::spawn`] for details.
244    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
245    where
246        T: 'static,
247    {
248        let accessor = Accessor {
249            get_data: self.get_data,
250            token: StoreToken::new(self.store.as_context_mut()),
251        };
252        self.store
253            .as_context_mut()
254            .spawn_with_accessor(accessor, task)
255    }
256
257    /// Returns the getter this accessor is using to project from `T` into
258    /// `D::Data`.
259    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260        self.get_data
261    }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266    D: HasData + ?Sized,
267    T: 'static,
268{
269    type Data = T;
270
271    fn as_context(&self) -> StoreContext<'_, T> {
272        self.store.as_context()
273    }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278    D: HasData + ?Sized,
279    T: 'static,
280{
281    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282        self.store.as_context_mut()
283    }
284}
285
286/// Provides scoped mutable access to store data in the context of a concurrent
287/// host task future.
288///
289/// This allows multiple host task futures to execute concurrently and access
290/// the store between (but not across) `await` points.
291///
292/// # Rationale
293///
294/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
295/// `D::Data<'_>`. The problem this is solving, however, is that it does not
296/// literally store these values. The basic problem is that when a concurrent
297/// host future is being polled it has access to `&mut T` (and the whole
298/// `Store`) but when it's not being polled it does not have access to these
299/// values. This reflects how the store is only ever polling one future at a
300/// time so the store is effectively being passed between futures.
301///
302/// Rust's `Future` trait, however, has no means of passing a `Store`
303/// temporarily between futures. The [`Context`](core::task::Context) type does
304/// not have the ability to attach arbitrary information to it at this time.
305/// This type, [`Accessor`], is used to bridge this expressivity gap.
306///
307/// The [`Accessor`] type here represents the ability to acquire, temporarily in
308/// a synchronous manner, the current store. The [`Accessor::with`] function
309/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
310/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
311/// not take an `async` closure as its argument, instead it's a synchronous
312/// closure which must complete during on run of `Future::poll`. This reflects
313/// how the store is temporarily made available while a host future is being
314/// polled.
315///
316/// # Implementation
317///
318/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
319/// this type additionally doesn't even have a lifetime parameter. This is
320/// instead a representation of proof of the ability to acquire these while a
321/// future is being polled. Wasmtime will, when it polls a host future,
322/// configure ambient state such that the `Accessor` that a future closes over
323/// will work and be able to access the store.
324///
325/// This has a number of implications for users such as:
326///
327/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
328///   the lifetime of a single future.
329/// * A future is expected to, however, close over an `Accessor` and keep it
330///   alive probably for the duration of the entire future.
331/// * Different host futures will be given different `Accessor`s, and that's
332///   intentional.
333/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
334///   alleviates some otherwise required bounds to be written down.
335///
336/// # Using `Accessor` in `Drop`
337///
338/// The methods on `Accessor` are only expected to work in the context of
339/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
340/// host future can be dropped at any time throughout the system and Wasmtime
341/// store context is not necessarily available at that time. It's recommended to
342/// not use `Accessor` methods in anything connected to a `Drop` implementation
343/// as they will panic and have unintended results. If you run into this though
344/// feel free to file an issue on the Wasmtime repository.
345pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347    D: HasData + ?Sized,
348{
349    token: StoreToken<T>,
350    get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353/// A helper trait to take any type of accessor-with-data in functions.
354///
355/// This trait is similar to [`AsContextMut`] except that it's used when
356/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
357/// [`Accessor`] is the main type used in concurrent settings and is passed to
358/// functions such as [`Func::call_concurrent`].
359///
360/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
361/// this trait. This effectively means that regardless of the `D` in
362/// `Accessor<T, D>` it can still be passed to a function which just needs a
363/// store accessor.
364///
365/// Acquiring an [`Accessor`] can be done through
366/// [`StoreContextMut::run_concurrent`] for example or in a host function
367/// through
368/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
369pub trait AsAccessor {
370    /// The `T` in `Store<T>` that this accessor refers to.
371    type Data: 'static;
372
373    /// The `D` in `Accessor<T, D>`, or the projection out of
374    /// `Self::Data`.
375    type AccessorData: HasData + ?Sized;
376
377    /// Returns the accessor that this is referring to.
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382    type Data = T::Data;
383    type AccessorData = T::AccessorData;
384
385    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386        T::as_accessor(self)
387    }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391    type Data = T;
392    type AccessorData = D;
393
394    fn as_accessor(&self) -> &Accessor<T, D> {
395        self
396    }
397}
398
399// Note that it is intentional at this time that `Accessor` does not actually
400// store `&mut T` or anything similar. This distinctly enables the `Accessor`
401// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
402// that matter). This is used to ergonomically simplify bindings where the
403// majority of the time `Accessor` is closed over in a future which then needs
404// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
405// you already have to write `T: 'static`...) it helps to avoid this.
406//
407// Note as well that `Accessor` doesn't actually store its data at all. Instead
408// it's more of a "proof" of what can be accessed from TLS. API design around
409// `Accessor` and functions like `Linker::func_wrap_concurrent` are
410// intentionally made to ensure that `Accessor` is ideally only used in the
411// context that TLS variables are actually set. For example host functions are
412// given `&Accessor`, not `Accessor`, and this prevents them from persisting
413// the value outside of a future. Within the future the TLS variables are all
414// guaranteed to be set while the future is being polled.
415//
416// Finally though this is not an ironclad guarantee, but nor does it need to be.
417// The TLS APIs are designed to panic or otherwise model usage where they're
418// called recursively or similar. It's hoped that code cannot be constructed to
419// actually hit this at runtime but this is not a safety requirement at this
420// time.
421const _: () = {
422    const fn assert<T: Send + Sync>() {}
423    assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427    /// Creates a new `Accessor` backed by the specified functions.
428    ///
429    /// - `get`: used to retrieve the store
430    ///
431    /// - `get_data`: used to "project" from the store's associated data to
432    /// another type (e.g. a field of that data or a wrapper around it).
433    ///
434    /// - `spawn`: used to queue spawned background tasks to be run later
435    pub(crate) fn new(token: StoreToken<T>) -> Self {
436        Self {
437            token,
438            get_data: |x| x,
439        }
440    }
441}
442
443impl<T, D> Accessor<T, D>
444where
445    D: HasData + ?Sized,
446{
447    /// Run the specified closure, passing it mutable access to the store.
448    ///
449    /// This function is one of the main building blocks of the [`Accessor`]
450    /// type. This yields synchronous, blocking, access to the store via an
451    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
452    /// providing the ability to access `D` via [`Access::get`]. Note that the
453    /// `fun` here is given only temporary access to the store and `T`/`D`
454    /// meaning that the return value `R` here is not allowed to capture borrows
455    /// into the two. If access is needed to data within `T` or `D` outside of
456    /// this closure then it must be `clone`d out, for example.
457    ///
458    /// # Panics
459    ///
460    /// This function will panic if it is call recursively with any other
461    /// accessor already in scope. For example if `with` is called within `fun`,
462    /// then this function will panic. It is up to the embedder to ensure that
463    /// this does not happen.
464    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465        tls::get(|vmstore| {
466            fun(Access {
467                store: self.token.as_context_mut(vmstore),
468                get_data: self.get_data,
469            })
470        })
471    }
472
473    /// Returns the getter this accessor is using to project from `T` into
474    /// `D::Data`.
475    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476        self.get_data
477    }
478
479    /// Changes this accessor to access `D2` instead of the current type
480    /// parameter `D`.
481    ///
482    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
483    ///
484    /// # Panics
485    ///
486    /// When using this API the returned value is disconnected from `&self` and
487    /// the lifetime binding the `self` argument. An `Accessor` only works
488    /// within the context of the closure or async closure that it was
489    /// originally given to, however. This means that due to the fact that the
490    /// returned value has no lifetime connection it's possible to use the
491    /// accessor outside of `&self`, the original accessor, and panic.
492    ///
493    /// The returned value should only be used within the scope of the original
494    /// `Accessor` that `self` refers to.
495    pub fn with_getter<D2: HasData>(
496        &self,
497        get_data: fn(&mut T) -> D2::Data<'_>,
498    ) -> Accessor<T, D2> {
499        Accessor {
500            token: self.token,
501            get_data,
502        }
503    }
504
505    /// Spawn a background task which will receive an `&Accessor<T, D>` and
506    /// run concurrently with any other tasks in progress for the current
507    /// store.
508    ///
509    /// This is particularly useful for host functions which return a `stream`
510    /// or `future` such that the code to write to the write end of that
511    /// `stream` or `future` must run after the function returns.
512    ///
513    /// The returned [`JoinHandle`] may be used to cancel the task.
514    ///
515    /// # Panics
516    ///
517    /// Panics if called within a closure provided to the [`Accessor::with`]
518    /// function. This can only be called outside an active invocation of
519    /// [`Accessor::with`].
520    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
521    where
522        T: 'static,
523    {
524        let accessor = self.clone_for_spawn();
525        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526    }
527
528    fn clone_for_spawn(&self) -> Self {
529        Self {
530            token: self.token,
531            get_data: self.get_data,
532        }
533    }
534
535    /// Polls to see if this store contains any "interesting" tasks still within
536    /// it.
537    ///
538    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
539    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
540    /// the last remaining "interesting" task has exited the provided context's
541    /// waker will be notified. Note that only the waker passed to the last call
542    /// to `poll_no_interesting_tasks` for the store will be notified, so this
543    /// is only appropriate to use once-at-a-time per store.
544    ///
545    /// The component model specification, as of this current date, does not
546    /// have a distinction between "interesting" tasks and not. The current
547    /// intention is that in a future revision of the component model this will
548    /// be distinguished at the component ABI level where tasks will be able to
549    /// flag themselves as "interesting" optionally. Additionally extra work can
550    /// be opted-in to being "interesting".
551    ///
552    /// For now what this means is that all component model tasks within this
553    /// store are considered interesting. This specifically includes the entire
554    /// duration of a task, so even all of the time after a task has returned
555    /// but before it has exited. This means that this function is, today,
556    /// effectively a proxy for "are there any more tasks still running in this
557    /// store". This can be used by embedders to determine whether there's any
558    /// more work going on, even in the background, for a particular guest.
559    /// Hosts can use this as a signal that the guest wants to stay alive a
560    /// little longer, even after a task has returned.
561    ///
562    /// In the future this predicate won't include all tasks in this store. Some
563    /// tasks will be able to flag themselves as not interesting, meaning that
564    /// when this returns ready it'd be possible that there are still tasks
565    /// remaining in the store.
566    ///
567    /// Note that at this time spawned threads within a task are always
568    /// considered uninteresting. If this function returns ready, then spawned
569    /// threads may still be in the store.
570    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571        self.with(|mut access| {
572            let store = access.as_context_mut().0;
573            let state = store.concurrent_state_mut_without_forcing_current_thread();
574            if state.interesting_tasks == 0 {
575                Poll::Ready(())
576            } else {
577                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578                Poll::Pending
579            }
580        })
581    }
582
583    /// Poll to see if the component instance corresponding to the specified
584    /// function is ready to run a concurrent call without queuing it (i.e. does
585    /// not have backpressure enabled and does not have a sync call in
586    /// progress).
587    ///
588    /// Returns `Poll::Ready(())` if the component instance is ready to run a
589    /// concurrent call, and otherwise returns `Poll::Pending`.  If pending is
590    /// returned then whenever the instance becomes ready for a call the
591    /// provided context's waker will be notified.  Note that only the waker
592    /// passed to the last call to `poll_ready_for_concurrent_call` for the
593    /// store will be notified (regardless of whether the same or different
594    /// `Func` is specified relative to earlier calls), so this is only
595    /// appropriate to use once-at-a-time per store.  Also note that the waker
596    /// may be notified when _any_ instance becomes callable (i.e. not
597    /// necessarily the last one polled), so this function must be called again
598    /// to determine if the instance of interest is ready.
599    pub fn poll_ready_for_concurrent_call(&self, func: Func, cx: &mut Context<'_>) -> Poll<()> {
600        self.with(|mut access| {
601            let store = access.as_context_mut().0;
602            let (_, _, _, raw_options) = func.abi_info(store);
603            let instance = func.instance().runtime_instance(raw_options.instance);
604            let state = store.instance_state(instance).concurrent_state();
605            if state.backpressure == 0 {
606                Poll::Ready(())
607            } else {
608                store
609                    .concurrent_state_mut_without_forcing_current_thread()
610                    .ready_for_concurrent_call_waker = Some(cx.waker().clone());
611                Poll::Pending
612            }
613        })
614    }
615}
616
617/// Represents a task which may be provided to `Accessor::spawn`,
618/// `Accessor::forward`, or `StorecContextMut::spawn`.
619// TODO: Replace this with `core::ops::AsyncFnOnce` when that becomes a viable
620// option.
621//
622// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
623// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
624// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
625// Send + Sync + 'static` fails with a type mismatch error when we try to pass
626// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
627// best we can do for the time being.
628pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
629where
630    D: HasData + ?Sized,
631{
632    /// Run the task.
633    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
634}
635
636/// Represents parameter and result metadata for the caller side of a
637/// guest->guest call orchestrated by a fused adapter.
638enum CallerInfo {
639    /// Metadata for a call to an async-lowered import
640    Async {
641        params: Vec<ValRaw>,
642        has_result: bool,
643    },
644    /// Metadata for a call to an sync-lowered import
645    Sync {
646        params: Vec<ValRaw>,
647        result_count: u32,
648    },
649}
650
651/// Indicates how a guest task is waiting on a waitable set.
652enum WaitMode {
653    /// The guest task is waiting using `task.wait`
654    Fiber(StoreFiber<'static>),
655    /// The guest task is waiting via a callback declared as part of an
656    /// async-lifted export.
657    Callback(Instance),
658}
659
660/// Represents the reason a fiber is suspending itself.
661#[derive(Debug)]
662enum SuspendReason {
663    /// The fiber is waiting for an event to be delivered to the specified
664    /// waitable set or task.
665    Waiting {
666        set: TableId<WaitableSet>,
667        thread: QualifiedThreadId,
668        skip_may_block_check: bool,
669    },
670    /// The fiber has finished handling its most recent work item and is waiting
671    /// for another (or to be dropped if it is no longer needed).
672    NeedWork,
673    /// The fiber is yielding and should be resumed once other tasks have had a
674    /// chance to run.
675    Yielding {
676        thread: QualifiedThreadId,
677        cancellable: bool,
678        skip_may_block_check: bool,
679    },
680    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
681    ExplicitlySuspending {
682        thread: QualifiedThreadId,
683        skip_may_block_check: bool,
684    },
685}
686
687/// Represents a pending call into guest code for a given guest task.
688enum GuestCallKind {
689    /// Indicates there's an event to deliver to the task, possibly related to a
690    /// waitable set the task has been waiting on or polling.
691    DeliverEvent {
692        /// The instance to which the task belongs.
693        instance: Instance,
694        /// The waitable set the event belongs to, if any.
695        ///
696        /// If this is `None` the event will be waiting in the
697        /// `GuestTask::event` field for the task.
698        set: Option<TableId<WaitableSet>>,
699    },
700    /// Indicates that a new guest task call is pending and may be executed
701    /// using the specified closure.
702    ///
703    /// If the closure returns `Ok(Some(call))`, the `call` should be run
704    /// immediately using `handle_guest_call`.
705    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
706    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
707}
708
709impl fmt::Debug for GuestCallKind {
710    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
711        match self {
712            Self::DeliverEvent { instance, set } => f
713                .debug_struct("DeliverEvent")
714                .field("instance", instance)
715                .field("set", set)
716                .finish(),
717            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
718            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
719        }
720    }
721}
722
723/// The target of a suspension intrinsic.
724#[derive(Copy, Clone, Debug)]
725pub enum SuspensionTarget {
726    SomeSuspended(u32),
727    Some(u32),
728    None,
729}
730
731impl SuspensionTarget {
732    fn is_none(&self) -> bool {
733        matches!(self, SuspensionTarget::None)
734    }
735    fn is_some(&self) -> bool {
736        !self.is_none()
737    }
738}
739
740/// Represents a pending call into guest code for a given guest thread.
741#[derive(Debug)]
742struct GuestCall {
743    thread: QualifiedThreadId,
744    kind: GuestCallKind,
745}
746
747impl GuestCall {
748    /// Returns whether or not the call is ready to run.
749    ///
750    /// A call will not be ready to run if either:
751    ///
752    /// - the (sub-)component instance to be called has already been entered and
753    /// cannot be reentered until an in-progress call completes
754    ///
755    /// - the call is for a not-yet started task and the (sub-)component
756    /// instance to be called has backpressure enabled
757    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
758        let instance = store
759            .concurrent_state_mut()?
760            .get_mut(self.thread.task)?
761            .instance;
762        let state = store.instance_state(instance).concurrent_state();
763
764        let ready = match &self.kind {
765            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
766            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
767            GuestCallKind::StartExplicit(_) => true,
768        };
769        log::trace!(
770            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
771            state.do_not_enter,
772            state.backpressure
773        );
774        Ok(ready)
775    }
776}
777
778/// Job to be run on a worker fiber.
779enum WorkerItem {
780    GuestCall(GuestCall),
781    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
782}
783
784/// Represents a pending work item to be handled by the event loop for a given
785/// component instance.
786enum WorkItem {
787    /// A host task to be pushed to `ConcurrentState::futures`.
788    PushFuture(AlwaysMut<HostTaskFuture>),
789    /// A fiber to resume.
790    ResumeFiber(StoreFiber<'static>),
791    /// A thread to resume.
792    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
793    /// A pending call into guest code for a given guest task.
794    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
795    /// A job to run on a worker fiber.
796    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
797}
798
799impl fmt::Debug for WorkItem {
800    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
801        match self {
802            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
803            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
804            Self::ResumeThread(instance, thread) => f
805                .debug_tuple("ResumeThread")
806                .field(instance)
807                .field(thread)
808                .finish(),
809            Self::GuestCall(instance, call) => f
810                .debug_tuple("GuestCall")
811                .field(instance)
812                .field(call)
813                .finish(),
814            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
815        }
816    }
817}
818
819/// Whether a suspension intrinsic was cancelled or completed
820#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
821pub(crate) enum WaitResult {
822    Cancelled,
823    Completed,
824}
825
826/// Poll the specified future until it completes on behalf of a guest->host call
827/// using a sync-lowered import.
828///
829/// This is similar to `Instance::first_poll` except it's for sync-lowered
830/// imports, meaning we don't need to handle cancellation and we can block the
831/// caller until the task completes, at which point the caller can handle
832/// lowering the result to the guest's stack and linear memory.
833pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
834    store: &mut dyn VMStore,
835    future: impl Future<Output = Result<R>> + Send + 'static,
836) -> Result<R> {
837    let task = store.current_host_thread()?;
838
839    // Wrap the future in a closure which will take care of stashing the result
840    // in `GuestTask::result` and resuming this fiber when the host task
841    // completes.
842    let mut future = Box::pin(async move {
843        let result = future.await?;
844        tls::get(move |store| {
845            let state = store.concurrent_state_mut()?;
846            let host_state = &mut state.get_mut(task)?.state;
847            assert!(matches!(host_state, HostTaskState::CalleeStarted));
848            *host_state = HostTaskState::CalleeFinished(Box::new(result));
849
850            Waitable::Host(task).set_event(
851                state,
852                Some(Event::Subtask {
853                    status: Status::Returned,
854                }),
855            )?;
856
857            Ok(())
858        })
859    }) as HostTaskFuture;
860
861    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
862    // add the future to `ConcurrentState::futures` and poll it automatically
863    // from the event loop if it doesn't complete immediately here.
864    let poll = tls::set(store, || {
865        future
866            .as_mut()
867            .poll(&mut Context::from_waker(&Waker::noop()))
868    });
869
870    match poll {
871        // It completed immediately; check the result and delete the task.
872        Poll::Ready(result) => result?,
873
874        // It did not complete immediately; add it to
875        // `ConcurrentState::futures` so it will be polled via the event loop;
876        // then use `GuestThread::sync_call_set` to wait for the task to
877        // complete, suspending the current fiber until it does so.
878        Poll::Pending => {
879            let state = store.concurrent_state_mut()?;
880            state.push_future(future);
881
882            let caller = state.get_mut(task)?.caller;
883            let set = state.get_mut(caller.thread)?.sync_call_set;
884            Waitable::Host(task).join(state, Some(set))?;
885
886            store.suspend(SuspendReason::Waiting {
887                set,
888                thread: caller,
889                skip_may_block_check: false,
890            })?;
891
892            // Remove the `task` from the `sync_call_set` to ensure that when
893            // this function returns and the task is deleted that there are no
894            // more lingering references to this host task.
895            Waitable::Host(task).join(store.concurrent_state_mut()?, None)?;
896        }
897    }
898
899    // Retrieve and return the result.
900    let host_state = &mut store.concurrent_state_mut()?.get_mut(task)?.state;
901    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
902        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
903            Ok(result) => *result,
904            Err(_) => bail_bug!("host task finished with wrong type of result"),
905        }),
906        _ => bail_bug!("unexpected host task state after completion"),
907    }
908}
909
910/// Execute the specified guest call.
911fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
912    let mut next = Some(call);
913    while let Some(call) = next.take() {
914        match call.kind {
915            GuestCallKind::DeliverEvent { instance, set } => {
916                let (event, waitable) =
917                    match instance.get_event(store, call.thread.task, set, true)? {
918                        Some(pair) => pair,
919                        None => bail_bug!("delivering non-present event"),
920                    };
921                let state = store.concurrent_state_mut()?;
922                let task = state.get_mut(call.thread.task)?;
923                let runtime_instance = task.instance;
924                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
925
926                log::trace!(
927                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
928                    call.thread,
929                );
930
931                let old_thread = store.set_thread(call.thread)?;
932                log::trace!(
933                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
934                    call.thread
935                );
936
937                store.enter_instance(runtime_instance);
938
939                let Some(callback) = store
940                    .concurrent_state_mut()?
941                    .get_mut(call.thread.task)?
942                    .callback
943                    .take()
944                else {
945                    bail_bug!("guest task callback field not present")
946                };
947
948                let code = callback(store, event, handle)?;
949
950                store
951                    .concurrent_state_mut()?
952                    .get_mut(call.thread.task)?
953                    .callback = Some(callback);
954
955                store.exit_instance(runtime_instance)?;
956
957                store.set_thread(old_thread)?;
958
959                next = instance.handle_callback_code(
960                    store,
961                    call.thread,
962                    runtime_instance.index,
963                    code,
964                )?;
965
966                log::trace!(
967                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
968                );
969            }
970            GuestCallKind::StartImplicit(fun) => {
971                next = fun(store)?;
972            }
973            GuestCallKind::StartExplicit(fun) => {
974                fun(store)?;
975            }
976        }
977    }
978
979    Ok(())
980}
981
982impl<T> Store<T> {
983    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
984    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
985    where
986        T: Send + 'static,
987    {
988        ensure!(
989            self.as_context().0.concurrency_support(),
990            "cannot use `run_concurrent` when Config::concurrency_support disabled",
991        );
992        self.as_context_mut().run_concurrent(fun).await
993    }
994
995    #[doc(hidden)]
996    pub fn assert_concurrent_state_empty(&mut self) {
997        self.as_context_mut().assert_concurrent_state_empty();
998    }
999
1000    #[doc(hidden)]
1001    pub fn concurrent_state_table_size(&mut self) -> usize {
1002        self.as_context_mut().concurrent_state_table_size()
1003    }
1004
1005    /// Convenience wrapper for [`StoreContextMut::spawn`].
1006    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> Result<JoinHandle>
1007    where
1008        T: 'static,
1009    {
1010        self.as_context_mut().spawn(task)
1011    }
1012}
1013
1014impl<T> StoreContextMut<'_, T> {
1015    /// Assert that all the relevant tables and queues in the concurrent state
1016    /// for this store are empty.
1017    ///
1018    /// This is for sanity checking in integration tests
1019    /// (e.g. `component-async-tests`) that the relevant state has been cleared
1020    /// after each test concludes.  This should help us catch leaks, e.g. guest
1021    /// tasks which haven't been deleted despite having completed and having
1022    /// been dropped by their supertasks.
1023    ///
1024    /// Only intended for use in Wasmtime's own testing.
1025    #[doc(hidden)]
1026    pub fn assert_concurrent_state_empty(self) {
1027        let store = self.0;
1028        store
1029            .store_data_mut()
1030            .components
1031            .assert_instance_states_empty();
1032        let state = store.concurrent_state_mut().unwrap();
1033        assert!(
1034            state.table.get_mut().is_empty(),
1035            "non-empty table: {:?}",
1036            state.table.get_mut()
1037        );
1038        assert!(state.high_priority.is_empty());
1039        assert!(state.low_priority.is_empty());
1040        assert!(state.unforced_current_thread.is_none());
1041        assert!(state.futures_mut().unwrap().is_empty());
1042        assert!(state.global_error_context_ref_counts.is_empty());
1043    }
1044
1045    /// Helper function to perform tests over the size of the concurrent state
1046    /// table which can be useful for detecting leaks.
1047    ///
1048    /// Only intended for use in Wasmtime's own testing.
1049    #[doc(hidden)]
1050    pub fn concurrent_state_table_size(&mut self) -> usize {
1051        self.0
1052            .concurrent_state_mut()
1053            .unwrap()
1054            .table
1055            .get_mut()
1056            .iter_mut()
1057            .count()
1058    }
1059
1060    /// Spawn a background task to run as part of this instance's event loop.
1061    ///
1062    /// The task will receive an `&Accessor<U>` and run concurrently with
1063    /// any other tasks in progress for the instance.
1064    ///
1065    /// Note that the task will only make progress if and when the event loop
1066    /// for this instance is run.
1067    ///
1068    /// The returned [`JoinHandle`] may be used to cancel the task.
1069    pub fn spawn(mut self, task: impl AccessorTask<T>) -> Result<JoinHandle>
1070    where
1071        T: 'static,
1072    {
1073        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1074        self.spawn_with_accessor(accessor, task)
1075    }
1076
1077    /// Internal implementation of `spawn` functions where a `store` is
1078    /// available along with an `Accessor`.
1079    fn spawn_with_accessor<D>(
1080        self,
1081        accessor: Accessor<T, D>,
1082        task: impl AccessorTask<T, D>,
1083    ) -> Result<JoinHandle>
1084    where
1085        T: 'static,
1086        D: HasData + ?Sized,
1087    {
1088        // Create an "abortable future" here where internally the future will
1089        // hook calls to poll and possibly spawn more background tasks on each
1090        // iteration.
1091        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1092        self.0
1093            .concurrent_state_mut()?
1094            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1095        Ok(handle)
1096    }
1097
1098    /// Run the specified closure `fun` to completion as part of this store's
1099    /// event loop.
1100    ///
1101    /// This will run `fun` as part of this store's event loop until it
1102    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1103    /// controlled access to the store and its data.
1104    ///
1105    /// This function can be used to invoke [`Func::call_concurrent`] for
1106    /// example within the async closure provided here.
1107    ///
1108    /// This function will unconditionally return an error if
1109    /// [`Config::concurrency_support`] is disabled.
1110    ///
1111    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1112    ///
1113    /// # Store-blocking behavior
1114    ///
1115    /// At this time there are certain situations in which the `Future` returned
1116    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1117    /// extended period of time, despite one or more `Waker::wake` events having
1118    /// occurred for the task to which it belongs.  This can manifest as the
1119    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1120    /// the `Store` being held by e.g. a blocking host function, preventing the
1121    /// `Future` from being polled. A canonical example of this is when the
1122    /// `fun` provided to this function attempts to set a timeout for an
1123    /// invocation of a wasm function. In this situation the async closure is
1124    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1125    /// to elapse. At this time this setup will not always work and the timeout
1126    /// may not reliably fire.
1127    ///
1128    /// This function will not block the current thread and as such is always
1129    /// suitable to run in an `async` context, but the current implementation of
1130    /// Wasmtime can lead to situations where a certain wasm computation is
1131    /// required to make progress the closure to make progress. This is an
1132    /// artifact of Wasmtime's historical implementation of `async` functions
1133    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1134    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1135    /// progress for a readiness notification of (b) to get delivered.
1136    ///
1137    /// This effectively means that it's not possible to reliably perform a
1138    /// "select" operation within the `fun` closure, which timeouts for example
1139    /// are based on. Fixing this requires some relatively major refactoring
1140    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1141    /// that is intended to be fixed one day. In the meantime it's recommended
1142    /// to apply timeouts or such to the entire `run_concurrent` call itself
1143    /// rather than internally.
1144    ///
1145    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1146    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1147    ///
1148    /// # Example
1149    ///
1150    /// ```
1151    /// # use {
1152    /// #   wasmtime::{
1153    /// #     error::{Result},
1154    /// #     component::{ Component, Linker, Resource, ResourceTable},
1155    /// #     Config, Engine, Store
1156    /// #   },
1157    /// # };
1158    /// #
1159    /// # struct MyResource(u32);
1160    /// # struct Ctx { table: ResourceTable }
1161    /// #
1162    /// # async fn foo() -> Result<()> {
1163    /// # let mut config = Config::new();
1164    /// # let engine = Engine::new(&config)?;
1165    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1166    /// # let mut linker = Linker::new(&engine);
1167    /// # let component = Component::new(&engine, "")?;
1168    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1169    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1170    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1171    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1172    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1173    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1174    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1175    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1176    ///    Ok(())
1177    /// }).await??;
1178    /// # Ok(())
1179    /// # }
1180    /// ```
1181    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1182    where
1183        T: Send + 'static,
1184    {
1185        ensure!(
1186            self.0.concurrency_support(),
1187            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1188        );
1189        self.do_run_concurrent(fun, false).await
1190    }
1191
1192    pub(super) async fn run_concurrent_trap_on_idle<R>(
1193        self,
1194        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1195    ) -> Result<R>
1196    where
1197        T: Send + 'static,
1198    {
1199        self.do_run_concurrent(fun, true).await
1200    }
1201
1202    async fn do_run_concurrent<R>(
1203        mut self,
1204        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1205        trap_on_idle: bool,
1206    ) -> Result<R>
1207    where
1208        T: Send + 'static,
1209    {
1210        debug_assert!(self.0.concurrency_support());
1211        check_recursive_run();
1212        let token = StoreToken::new(self.as_context_mut());
1213
1214        struct Dropper<'a, T: 'static, V> {
1215            store: StoreContextMut<'a, T>,
1216            value: ManuallyDrop<V>,
1217        }
1218
1219        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1220            fn drop(&mut self) {
1221                tls::set(self.store.0, || {
1222                    // SAFETY: Here we drop the value without moving it for the
1223                    // first and only time -- per the contract for `Drop::drop`,
1224                    // this code won't run again, and the `value` field will no
1225                    // longer be accessible.
1226                    unsafe { ManuallyDrop::drop(&mut self.value) }
1227                });
1228            }
1229        }
1230
1231        let accessor = &Accessor::new(token);
1232        let dropper = &mut Dropper {
1233            store: self,
1234            value: ManuallyDrop::new(fun(accessor)),
1235        };
1236        // SAFETY: We never move `dropper` nor its `value` field.
1237        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1238
1239        dropper
1240            .store
1241            .as_context_mut()
1242            .poll_until(future, trap_on_idle)
1243            .await
1244    }
1245
1246    /// Run this store's event loop.
1247    ///
1248    /// The returned future will resolve when the specified future completes or,
1249    /// if `trap_on_idle` is true, when the event loop can't make further
1250    /// progress.
1251    async fn poll_until<R>(
1252        mut self,
1253        mut future: Pin<&mut impl Future<Output = R>>,
1254        trap_on_idle: bool,
1255    ) -> Result<R>
1256    where
1257        T: Send + 'static,
1258    {
1259        struct Reset<'a, T: 'static> {
1260            store: StoreContextMut<'a, T>,
1261            futures: Option<FuturesUnordered<HostTaskFuture>>,
1262        }
1263
1264        impl<'a, T> Drop for Reset<'a, T> {
1265            fn drop(&mut self) {
1266                if let Some(futures) = self.futures.take() {
1267                    *self
1268                        .store
1269                        .0
1270                        .concurrent_state_mut_already_forced_current_thread()
1271                        .futures
1272                        .get_mut() = Some(futures);
1273                }
1274            }
1275        }
1276
1277        loop {
1278            // Take `ConcurrentState::futures` out of the store so we can poll
1279            // it while also safely giving any of the futures inside access to
1280            // `self`.
1281            let futures = self.0.concurrent_state_mut()?.futures.get_mut().take();
1282            let mut reset = Reset {
1283                store: self.as_context_mut(),
1284                futures,
1285            };
1286            let mut next = match reset.futures.as_mut() {
1287                Some(f) => pin!(f.next()),
1288                None => bail_bug!("concurrent state missing futures field"),
1289            };
1290
1291            enum PollResult<R> {
1292                Complete(R),
1293                ProcessWork {
1294                    ready: Vec<WorkItem>,
1295                    low_priority: bool,
1296                },
1297            }
1298
1299            let result = future::poll_fn(|cx| {
1300                // First, poll the future we were passed as an argument and
1301                // return immediately if it's ready.
1302                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1303                    return Poll::Ready(Ok(PollResult::Complete(value)));
1304                }
1305
1306                // Next, poll `ConcurrentState::futures` (which includes any
1307                // pending host tasks and/or background tasks), returning
1308                // immediately if one of them fails.
1309                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1310                    Poll::Ready(Some(output)) => {
1311                        match output {
1312                            Err(e) => return Poll::Ready(Err(e)),
1313                            Ok(()) => {}
1314                        }
1315                        Poll::Ready(true)
1316                    }
1317                    Poll::Ready(None) => Poll::Ready(false),
1318                    Poll::Pending => Poll::Pending,
1319                };
1320
1321                // Next, collect the next batch of work items to process, if
1322                // any.  This will be either all of the high-priority work
1323                // items, or if there are none, a single low-priority work item.
1324                let state = reset.store.0.concurrent_state_mut()?;
1325                let mut ready = mem::take(&mut state.high_priority);
1326                let mut low_priority = false;
1327                if ready.is_empty() {
1328                    if let Some(item) = state.low_priority.pop_back() {
1329                        ready.push(item);
1330                        low_priority = true;
1331                    }
1332                }
1333                if !ready.is_empty() {
1334                    return Poll::Ready(Ok(PollResult::ProcessWork {
1335                        ready,
1336                        low_priority,
1337                    }));
1338                }
1339
1340                // Finally, if we have nothing else to do right now, determine what to do
1341                // based on whether there are any pending futures in
1342                // `ConcurrentState::futures`.
1343                return match next {
1344                    Poll::Ready(true) => {
1345                        // In this case, one of the futures in
1346                        // `ConcurrentState::futures` completed
1347                        // successfully, so we return now and continue
1348                        // the outer loop in case there is another one
1349                        // ready to complete.
1350                        Poll::Ready(Ok(PollResult::ProcessWork {
1351                            ready: Vec::new(),
1352                            low_priority: false,
1353                        }))
1354                    }
1355                    Poll::Ready(false) => {
1356                        // Poll the future we were passed one last time
1357                        // in case one of `ConcurrentState::futures` had
1358                        // the side effect of unblocking it.
1359                        if let Poll::Ready(value) =
1360                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1361                        {
1362                            Poll::Ready(Ok(PollResult::Complete(value)))
1363                        } else {
1364                            // In this case, there are no more pending
1365                            // futures in `ConcurrentState::futures`,
1366                            // there are no remaining work items, _and_
1367                            // the future we were passed as an argument
1368                            // still hasn't completed.
1369                            if trap_on_idle {
1370                                // `trap_on_idle` is true, so we exit
1371                                // immediately.
1372                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1373                            } else {
1374                                // `trap_on_idle` is false, so we assume
1375                                // that future will wake up and give us
1376                                // more work to do when it's ready to.
1377                                Poll::Pending
1378                            }
1379                        }
1380                    }
1381                    // There is at least one pending future in
1382                    // `ConcurrentState::futures` and we have nothing
1383                    // else to do but wait for now, so we return
1384                    // `Pending`.
1385                    Poll::Pending => Poll::Pending,
1386                };
1387            })
1388            .await;
1389
1390            // Put the `ConcurrentState::futures` back into the store before we
1391            // return or handle any work items since one or more of those items
1392            // might append more futures.
1393            drop(reset);
1394
1395            match result? {
1396                // The future we were passed as an argument completed, so we
1397                // return the result.
1398                PollResult::Complete(value) => break Ok(value),
1399                // The future we were passed has not yet completed, so handle
1400                // any work items and then loop again.
1401                PollResult::ProcessWork {
1402                    ready,
1403                    low_priority,
1404                } => {
1405                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1406                        store: StoreContextMut<'a, T>,
1407                        ready: I,
1408                    }
1409
1410                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1411                        fn drop(&mut self) {
1412                            while let Some(item) = self.ready.next() {
1413                                match item {
1414                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1415                                    WorkItem::PushFuture(future) => {
1416                                        tls::set(self.store.0, move || drop(future))
1417                                    }
1418                                    _ => {}
1419                                }
1420                            }
1421                        }
1422                    }
1423
1424                    let mut dispose = Dispose {
1425                        store: self.as_context_mut(),
1426                        ready: ready.into_iter(),
1427                    };
1428
1429                    // If we're about to run a low-priority task, first yield to
1430                    // the executor.  This ensures that it won't be starved of
1431                    // the ability to e.g. update the readiness of sockets,
1432                    // etc. which the guest may be using `thread.yield` along
1433                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1434                    //
1435                    // This works for e.g. `thread.yield` and callbacks which
1436                    // return `CALLBACK_CODE_YIELD` because we queue a low
1437                    // priority item to resume the task (i.e. resume the thread
1438                    // or call the callback, respectively) just prior to
1439                    // suspending it.  Indeed, as of this writing those are the
1440                    // _only_ situations we queue low-priority tasks.
1441                    // Therefore, we interpret the guest's request to yield as
1442                    // meaning "yield to other guest tasks _and_/_or_ host
1443                    // operations such as updating socket readiness", the latter
1444                    // being the async runtime's responsibility.
1445                    //
1446                    // In the future, if this ends up causing measurable
1447                    // performance issues, this could be optimized such that we
1448                    // only yield periodically (e.g. for batches of low priority
1449                    // items) and not for each and every idividual item.
1450                    if low_priority {
1451                        dispose.store.0.yield_now().await
1452                    }
1453
1454                    while let Some(item) = dispose.ready.next() {
1455                        dispose
1456                            .store
1457                            .as_context_mut()
1458                            .handle_work_item(item)
1459                            .await?;
1460                    }
1461                }
1462            }
1463        }
1464    }
1465
1466    /// Handle the specified work item, possibly resuming a fiber if applicable.
1467    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1468    where
1469        T: Send,
1470    {
1471        log::trace!("handle work item {item:?}");
1472        match item {
1473            WorkItem::PushFuture(future) => {
1474                self.0
1475                    .concurrent_state_mut()?
1476                    .futures_mut()?
1477                    .push(future.into_inner());
1478            }
1479            WorkItem::ResumeFiber(fiber) => {
1480                self.0.resume_fiber(fiber).await?;
1481            }
1482            WorkItem::ResumeThread(_, thread) => {
1483                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1484                    &mut self.0.concurrent_state_mut()?.get_mut(thread.thread)?.state,
1485                    GuestThreadState::Running,
1486                ) {
1487                    self.0.resume_fiber(fiber).await?;
1488                } else {
1489                    bail_bug!("cannot resume non-pending thread {thread:?}");
1490                }
1491            }
1492            WorkItem::GuestCall(_, call) => {
1493                if call.is_ready(self.0)? {
1494                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1495                } else {
1496                    let state = self.0.concurrent_state_mut()?;
1497                    let task = state.get_mut(call.thread.task)?;
1498                    if !task.starting_sent {
1499                        task.starting_sent = true;
1500                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1501                            Waitable::Guest(call.thread.task).set_event(
1502                                state,
1503                                Some(Event::Subtask {
1504                                    status: Status::Starting,
1505                                }),
1506                            )?;
1507                        }
1508                    }
1509
1510                    let instance = state.get_mut(call.thread.task)?.instance;
1511                    self.0
1512                        .instance_state(instance)
1513                        .concurrent_state()
1514                        .pending
1515                        .insert(call.thread, call.kind);
1516                }
1517            }
1518            WorkItem::WorkerFunction(fun) => {
1519                self.run_on_worker(WorkerItem::Function(fun)).await?;
1520            }
1521        }
1522
1523        Ok(())
1524    }
1525
1526    /// Execute the specified guest call on a worker fiber.
1527    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1528    where
1529        T: Send,
1530    {
1531        let worker = if let Some(fiber) = self.0.concurrent_state_mut()?.worker.take() {
1532            fiber
1533        } else {
1534            fiber::make_fiber(self.0, move |store| {
1535                loop {
1536                    let Some(item) = store.concurrent_state_mut()?.worker_item.take() else {
1537                        bail_bug!("worker_item not present when resuming fiber")
1538                    };
1539                    match item {
1540                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1541                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1542                    }
1543
1544                    store.suspend(SuspendReason::NeedWork)?;
1545                }
1546            })?
1547        };
1548
1549        let worker_item = &mut self.0.concurrent_state_mut()?.worker_item;
1550        assert!(worker_item.is_none());
1551        *worker_item = Some(item);
1552
1553        self.0.resume_fiber(worker).await
1554    }
1555
1556    /// Wrap the specified host function in a future which will call it, passing
1557    /// it an `&Accessor<T>`.
1558    ///
1559    /// See the `Accessor` documentation for details.
1560    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1561    where
1562        T: 'static,
1563        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1564            + Send
1565            + Sync
1566            + 'static,
1567        R: Send + Sync + 'static,
1568    {
1569        let token = StoreToken::new(self);
1570        async move {
1571            let mut accessor = Accessor::new(token);
1572            closure(&mut accessor).await
1573        }
1574    }
1575
1576    /// Returns an iterator over the current async call stack defined by the
1577    /// component model.
1578    ///
1579    /// This can be used, for example to correlate a host import call with which
1580    /// root export task originally called it.
1581    ///
1582    /// Tasks are yielded "youngest first" where the first item in the iterator
1583    /// is the current task, and the last item in the iterator is the original
1584    /// call.
1585    pub fn async_call_stack(&mut self) -> Result<impl Iterator<Item = GuestTaskId>> {
1586        let mut cur = Some(self.0.current_thread()?);
1587        let state = self.0.concurrent_state_mut()?;
1588        Ok(core::iter::from_fn(move || {
1589            while let Some(t) = cur {
1590                cur = state.parent(t);
1591                if let Some(thread) = t.guest() {
1592                    return Some(GuestTaskId(thread.task));
1593                }
1594            }
1595
1596            None
1597        }))
1598    }
1599}
1600
1601impl StoreOpaque {
1602    /// Returns the currently-running thread, promoting any deferred lazy thread
1603    /// into a fully-materialized `CurrentThread`.
1604    pub(crate) fn current_thread(&mut self) -> Result<CurrentThread> {
1605        // Without concurrency support there is nothing to force.
1606        if !self.concurrency_support() {
1607            return Ok(CurrentThread::None);
1608        }
1609
1610        // If the JIT-visible current thread isn't a deferred thread then
1611        // `ConcurrentState` is already up to date.
1612        if !self
1613            .vm_store_context_mut()
1614            .current_thread_mut()
1615            .is_deferred()
1616        {
1617            return Ok(self
1618                .concurrent_state_mut_already_forced_current_thread()
1619                .unforced_current_thread);
1620        }
1621
1622        // The component instance whose adapters pushed the deferred frames; all
1623        // frames in a guest-to-guest, sync-to-sync call chain of fused adapters
1624        // live within a single `wasmtime::component::Instance` (because
1625        // cross-`wasmtime::component::Instance` calls don't go through fused
1626        // adapters), and guest code only ever runs as a guest thread, so the
1627        // chain's base thread is already materialized in `ConcurrentState` and
1628        // we can get the `ComponentInstanceId` shared by the whole chain from
1629        // here.
1630        let state = self.concurrent_state_mut_without_forcing_current_thread();
1631        let id = match state.unforced_current_thread.guest().copied() {
1632            Some(thread) => state.get_mut(thread.task)?.instance.instance,
1633            None => bail_bug!("deferred component-model thread with non-guest base"),
1634        };
1635
1636        // Collect the deferred frames pushed inline by fused adapters, walking
1637        // the `parent` chain from innermost to the base.
1638        let mut frames = Vec::new();
1639        let mut cur = *self.vm_store_context_mut().current_thread_mut();
1640        while let Some(ptr) = cur.as_deferred() {
1641            // SAFETY: `ptr` points at a `VMDeferredThread` living in a fused
1642            // adapter's stack frame that is suspended below us on the stack
1643            // (mid-call, waiting for this nested call to return), so the
1644            // referent is still valid and exclusively ours to read.
1645            let deferred = unsafe { ptr.as_non_null().as_ref() };
1646            frames.push((
1647                deferred.callee_async != 0,
1648                deferred.callee_instance,
1649                deferred.saved_context,
1650            ));
1651            cur = deferred.parent;
1652        }
1653
1654        // Mark the current thread forced *before* replaying so that any
1655        // reentrant `force_current_thread` call short-circuits via the
1656        // non-deferred path above.
1657        *self.vm_store_context_mut().current_thread_mut() = VMLazyThread::forced();
1658
1659        // Save the current context, as we need to overwrite it while replaying
1660        // below.
1661        let current_context = *self.vm_store_context_mut().component_context_mut();
1662
1663        // Replay the deferred `enter_guest_sync_call`s outermost-first so that
1664        // the resulting `ConcurrentState` matches what the non-deferred path
1665        // would have otherwise produced.
1666        for (callee_async, callee_instance, saved_context) in frames.into_iter().rev() {
1667            // Restore the caller's context slots so that we save the correct
1668            // values into the caller's thread, exactly as the non-deferred path
1669            // would have on entry.
1670            *self.vm_store_context_mut().component_context_mut() = saved_context;
1671            let callee = RuntimeInstance {
1672                instance: id,
1673                index: RuntimeComponentInstanceIndex::from_u32(callee_instance),
1674            };
1675            self.enter_guest_sync_call(None, callee_async, callee)?;
1676        }
1677
1678        // Replaying done; restore the current context.
1679        *self.vm_store_context_mut().component_context_mut() = current_context;
1680
1681        Ok(self
1682            .concurrent_state_mut_without_forcing_current_thread()
1683            .unforced_current_thread)
1684    }
1685
1686    fn current_guest_thread(&mut self) -> Result<QualifiedThreadId> {
1687        match self.current_thread()?.guest() {
1688            Some(id) => Ok(*id),
1689            None => bail_bug!("current thread is not a guest thread"),
1690        }
1691    }
1692
1693    fn current_host_thread(&mut self) -> Result<TableId<HostTask>> {
1694        match self.current_thread()?.host() {
1695            Some(id) => Ok(id),
1696            None => bail_bug!("current thread is not a host thread"),
1697        }
1698    }
1699
1700    /// Returns whether there's a pending cancellation on the current guest thread,
1701    /// consuming the event if so.
1702    fn take_pending_cancellation(&mut self) -> Result<bool> {
1703        let thread = self.current_guest_thread()?;
1704        let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1705        if let Some(Event::Cancelled) = task.event {
1706            task.event.take();
1707            return Ok(true);
1708        }
1709        Ok(false)
1710    }
1711
1712    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1713    /// guest-to-guest call or a sync host-to-guest call.
1714    ///
1715    /// This task will only be used for the purpose of handling calls to
1716    /// intrinsic functions; both parameter lowering and result lifting are
1717    /// assumed to be taken care of elsewhere.
1718    ///
1719    /// NB: for sync-to-sync, guest-to-guest calls we delay task construction in
1720    /// fused adapters, see `StoreOpaque::current_thread`, `VMDeferredThread`,
1721    /// and `lower_fact_enter_sync_call`. Make sure all this stuff stays in
1722    /// sync!
1723    pub(crate) fn enter_guest_sync_call(
1724        &mut self,
1725        guest_caller: Option<RuntimeInstance>,
1726        callee_async: bool,
1727        callee: RuntimeInstance,
1728    ) -> Result<()> {
1729        log::trace!("enter sync call {callee:?}");
1730        if !self.concurrency_support() {
1731            return self.enter_call_not_concurrent();
1732        }
1733
1734        let thread = self.current_thread()?;
1735        let state = self.concurrent_state_mut()?;
1736        let instance = if let Some(thread) = thread.guest() {
1737            Some(state.get_mut(thread.task)?.instance)
1738        } else {
1739            None
1740        };
1741        if guest_caller.is_some() {
1742            debug_assert_eq!(instance, guest_caller);
1743        }
1744        let guest_thread = GuestTask::new(
1745            state,
1746            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1747            LiftResult {
1748                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1749                ty: TypeTupleIndex::reserved_value(),
1750                memory: None,
1751                string_encoding: StringEncoding::Utf8,
1752            },
1753            if let Some(thread) = thread.guest() {
1754                Caller::Guest { thread: *thread }
1755            } else {
1756                Caller::Host {
1757                    tx: None,
1758                    host_future_present: false,
1759                    caller: thread,
1760                }
1761            },
1762            None,
1763            callee,
1764            callee_async,
1765        )?;
1766
1767        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1768            guest_thread.thread,
1769            self,
1770            callee.index,
1771        )?;
1772        self.set_thread(guest_thread)?;
1773
1774        Ok(())
1775    }
1776
1777    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1778    ///
1779    /// NB: for sync-to-sync, guest-to-guest calls we delay task construction in
1780    /// fused adapters and then when the call returns we check to see if the
1781    /// task's contruction was forced and if not avoid calling out of the JIT
1782    /// code to this function. See `lower_fact_exit_sync_call`. Make sure all
1783    /// this stuff stays in sync!
1784    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1785        if !self.concurrency_support() {
1786            return Ok(self.exit_call_not_concurrent());
1787        }
1788        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1789            Some(t) => *t,
1790            None => bail_bug!("expected task when exiting"),
1791        };
1792        let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1793        let instance = task.instance;
1794        let caller = match &task.caller {
1795            &Caller::Guest { thread } => thread.into(),
1796            &Caller::Host { caller, .. } => caller,
1797        };
1798        task.lift_result = None;
1799        task.exited = true;
1800        self.set_thread(caller)?;
1801
1802        log::trace!("exit sync call {instance:?}");
1803        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1804
1805        Ok(())
1806    }
1807
1808    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1809    /// transition to the host.
1810    ///
1811    /// FIXME: this is called for all guest->host transitions and performs some
1812    /// relatively expensive table manipulations. This would ideally be
1813    /// optimized to avoid the full allocation of a `HostTask` in at least some
1814    /// situations.
1815    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1816        if !self.concurrency_support() {
1817            self.enter_call_not_concurrent()?;
1818            return Ok(None);
1819        }
1820        let caller = self.current_guest_thread()?;
1821        let state = self.concurrent_state_mut()?;
1822        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1823        log::trace!("new host task {task:?}");
1824        self.set_thread(task)?;
1825        Ok(Some(task))
1826    }
1827
1828    /// Invoked before lowering the results of a host task to the guest.
1829    ///
1830    /// This is used to update the current thread annotations within the store
1831    /// to ensure that it reflects the guest task, not the host task, since
1832    /// lowering may execute guest code.
1833    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1834        if !self.concurrency_support() {
1835            return Ok(());
1836        }
1837        let task = self.current_host_thread()?;
1838        let caller = self.concurrent_state_mut()?.get_mut(task)?.caller;
1839        self.set_thread(caller)?;
1840        Ok(())
1841    }
1842
1843    /// Dual of `host_task_create` and signifies that the host has finished and
1844    /// will be cleaned up.
1845    ///
1846    /// Note that this isn't invoked when the host is invoked asynchronously and
1847    /// the host isn't complete yet. In that situation the host task persists
1848    /// and will be cleaned up separately in `subtask_drop`
1849    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1850        match task {
1851            Some(task) => {
1852                log::trace!("delete host task {task:?}");
1853                self.concurrent_state_mut()?.delete(task)?;
1854            }
1855            None => {
1856                self.exit_call_not_concurrent();
1857            }
1858        }
1859        Ok(())
1860    }
1861
1862    /// Determine whether the specified instance may be entered from the host.
1863    ///
1864    /// We return `true` here only if all of the following hold:
1865    ///
1866    /// - The top-level instance is not already on the current task's call stack.
1867    /// - The instance is not in need of a post-return function call.
1868    /// - `self` has not been poisoned due to a trap.
1869    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1870        if self.trapped() {
1871            return Ok(false);
1872        }
1873        if !self.concurrency_support() {
1874            return Ok(true);
1875        }
1876        let mut cur = Some(self.current_thread()?);
1877        let state = self.concurrent_state_mut()?;
1878        while let Some(t) = cur {
1879            if let Some(thread) = t.guest() {
1880                let task = state.get_mut(thread.task)?;
1881                // Note that we only compare top-level instance IDs here.
1882                // The idea is that the host is not allowed to recursively
1883                // enter a top-level instance even if the specific leaf
1884                // instance is not on the stack. This the behavior defined
1885                // in the spec, and it allows us to elide runtime checks in
1886                // guest-to-guest adapters.
1887                if task.instance.instance == instance.instance {
1888                    return Ok(false);
1889                }
1890            }
1891            cur = state.parent(t);
1892        }
1893        Ok(true)
1894    }
1895
1896    /// Helper function to retrieve the `InstanceState` for the
1897    /// specified instance.
1898    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1899        self.component_instance_mut(instance.instance)
1900            .instance_state(instance.index)
1901    }
1902
1903    /// Configure the currently running `thread`.
1904    ///
1905    /// This will save off any state necessary for the previous thread, if
1906    /// applicable, and then it'll additionally update state for `thread` if
1907    /// needed too.
1908    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1909        let thread = thread.into();
1910        let state = self.concurrent_state_mut()?;
1911        let old_thread = mem::replace(&mut state.unforced_current_thread, thread);
1912
1913        // First thing to do after swapping threads is updating the context
1914        // slots for this thread within the store. This restores the behavior of
1915        // `context.{get,set}`. This involves taking the old state out of the
1916        // store, saving it in the thread that's being swapped from, and doing
1917        // the inverse for the new thread. When debug assertions are enabled
1918        // this also leaves behind sentinel values to try to uncover bugs where
1919        // this may be forgotten.
1920        if let Some(old_thread) = old_thread.guest() {
1921            let old_context = *self.vm_store_context_mut().component_context_mut();
1922            self.concurrent_state_mut()?
1923                .get_mut(old_thread.thread)?
1924                .context = old_context;
1925        }
1926        if cfg!(debug_assertions) {
1927            *self.vm_store_context_mut().component_context_mut() =
1928                [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1929        }
1930        if let Some(thread) = thread.guest() {
1931            let thread = self.concurrent_state_mut()?.get_mut(thread.thread)?;
1932            let context = thread.context;
1933            if cfg!(debug_assertions) {
1934                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1935            }
1936            *self.vm_store_context_mut().component_context_mut() = context;
1937        }
1938
1939        // Each time we switch threads, we conservatively set `task_may_block`
1940        // to `false` for the component instance we're switching away from (if
1941        // any), meaning it will be `false` for any new thread created for that
1942        // instance unless explicitly set otherwise.
1943        //
1944        // Additionally if we're switching to a new thread, set its component
1945        // instance's `task_may_block` according to where it left off.
1946        let state = self.concurrent_state_mut()?;
1947        if let Some(old_thread) = old_thread.guest() {
1948            let instance = state.get_mut(old_thread.task)?.instance.instance;
1949            self.component_instance_mut(instance)
1950                .set_task_may_block(false)
1951        }
1952
1953        if thread.guest().is_some() {
1954            self.set_task_may_block()?;
1955        }
1956
1957        // Keep the JIT-visible current-thread pointer in sync.
1958        *self.vm_store_context_mut().current_thread_mut() = if thread.is_none() {
1959            VMLazyThread::none()
1960        } else {
1961            VMLazyThread::forced()
1962        };
1963
1964        Ok(old_thread)
1965    }
1966
1967    /// Set the global variable representing whether the current task may block
1968    /// prior to entering Wasm code.
1969    fn set_task_may_block(&mut self) -> Result<()> {
1970        let guest_thread = self.current_guest_thread()?;
1971        let state = self.concurrent_state_mut()?;
1972        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1973        let may_block = self.concurrent_state_mut()?.may_block(guest_thread.task)?;
1974        self.component_instance_mut(instance)
1975            .set_task_may_block(may_block);
1976        Ok(())
1977    }
1978
1979    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1980        if !self.concurrency_support() {
1981            return Ok(());
1982        }
1983        let task = self.current_guest_thread()?.task;
1984        let state = self.concurrent_state_mut()?;
1985        let instance = state.get_mut(task)?.instance.instance;
1986        let task_may_block = self.component_instance(instance).get_task_may_block();
1987
1988        if task_may_block {
1989            Ok(())
1990        } else {
1991            Err(Trap::CannotBlockSyncTask.into())
1992        }
1993    }
1994
1995    /// Record that we're about to enter a (sub-)component instance which does
1996    /// not support more than one concurrent, stackful activation, meaning it
1997    /// cannot be entered again until the next call returns.
1998    fn enter_instance(&mut self, instance: RuntimeInstance) {
1999        log::trace!("enter {instance:?}");
2000        self.instance_state(instance)
2001            .concurrent_state()
2002            .do_not_enter = true;
2003    }
2004
2005    /// Record that we've exited a (sub-)component instance previously entered
2006    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
2007    /// See the documentation for the latter for details.
2008    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
2009        log::trace!("exit {instance:?}");
2010        self.instance_state(instance)
2011            .concurrent_state()
2012            .do_not_enter = false;
2013        self.partition_pending(instance)
2014    }
2015
2016    /// Iterate over `InstanceState::pending`, moving any ready items into the
2017    /// "high priority" work item queue.
2018    ///
2019    /// Also, notify `ConcurrentState::ready_for_concurrent_call_waker` if
2020    /// present.
2021    ///
2022    /// See `GuestCall::is_ready` for details.
2023    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
2024        for (thread, kind) in
2025            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
2026        {
2027            let call = GuestCall { thread, kind };
2028            if call.is_ready(self)? {
2029                self.concurrent_state_mut()?
2030                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
2031            } else {
2032                self.instance_state(instance)
2033                    .concurrent_state()
2034                    .pending
2035                    .insert(call.thread, call.kind);
2036            }
2037        }
2038
2039        if let Some(waker) = self
2040            .concurrent_state_mut()?
2041            .ready_for_concurrent_call_waker
2042            .take()
2043        {
2044            waker.wake();
2045        }
2046
2047        Ok(())
2048    }
2049
2050    /// Implements the `backpressure.{inc,dec}` intrinsics.
2051    pub(crate) fn backpressure_modify(
2052        &mut self,
2053        caller_instance: RuntimeInstance,
2054        modify: impl FnOnce(u16) -> Option<u16>,
2055    ) -> Result<()> {
2056        let state = self.instance_state(caller_instance).concurrent_state();
2057        let old = state.backpressure;
2058        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
2059        state.backpressure = new;
2060
2061        if old > 0 && new == 0 {
2062            // Backpressure was previously enabled and is now disabled; move any
2063            // newly-eligible guest calls to the "high priority" queue.
2064            self.partition_pending(caller_instance)?;
2065        }
2066
2067        Ok(())
2068    }
2069
2070    /// Resume the specified fiber, giving it exclusive access to the specified
2071    /// store.
2072    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
2073        let old_thread = self.current_thread()?;
2074        log::trace!("resume_fiber: save current thread {old_thread:?}");
2075
2076        let fiber = fiber::resolve_or_release(self, fiber).await?;
2077
2078        self.set_thread(old_thread)?;
2079
2080        let state = self.concurrent_state_mut()?;
2081
2082        if let Some(ot) = old_thread.guest() {
2083            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
2084        }
2085        log::trace!("resume_fiber: restore current thread {old_thread:?}");
2086
2087        if let Some(mut fiber) = fiber {
2088            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
2089            // See the `SuspendReason` documentation for what each case means.
2090            let reason = match state.suspend_reason.take() {
2091                Some(r) => r,
2092                None => bail_bug!("suspend reason missing when resuming fiber"),
2093            };
2094            match reason {
2095                SuspendReason::NeedWork => {
2096                    if state.worker.is_none() {
2097                        state.worker = Some(fiber);
2098                    } else {
2099                        fiber.dispose(self);
2100                    }
2101                }
2102                SuspendReason::Yielding {
2103                    thread,
2104                    cancellable,
2105                    ..
2106                } => {
2107                    state.get_mut(thread.thread)?.state =
2108                        GuestThreadState::Ready { fiber, cancellable };
2109                    let instance = state.get_mut(thread.task)?.instance.index;
2110                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
2111                }
2112                SuspendReason::ExplicitlySuspending { thread, .. } => {
2113                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
2114                }
2115                SuspendReason::Waiting { set, thread, .. } => {
2116                    let old = state
2117                        .get_mut(set)?
2118                        .waiting
2119                        .insert(thread, WaitMode::Fiber(fiber));
2120                    assert!(old.is_none());
2121                }
2122            };
2123        } else {
2124            log::trace!("resume_fiber: fiber has exited");
2125        }
2126
2127        Ok(())
2128    }
2129
2130    /// Suspend the current fiber, storing the reason in
2131    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
2132    /// it should be resumed.
2133    ///
2134    /// See the `SuspendReason` documentation for details.
2135    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
2136        log::trace!("suspend fiber: {reason:?}");
2137
2138        // If we're yielding or waiting on behalf of a guest thread, we'll need to
2139        // pop the call context which manages resource borrows before suspending
2140        // and then push it again once we've resumed.
2141        let task = match &reason {
2142            SuspendReason::Yielding { thread, .. }
2143            | SuspendReason::Waiting { thread, .. }
2144            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
2145            SuspendReason::NeedWork => None,
2146        };
2147
2148        let old_guest_thread = if task.is_some() {
2149            self.current_thread()?
2150        } else {
2151            CurrentThread::None
2152        };
2153
2154        // We should not have reached here unless either there's no current
2155        // task, or the current task is permitted to block.  In addition, we
2156        // special-case `thread.switch-to` and waiting for a subtask to go from
2157        // `starting` to `started`, both of which we consider non-blocking
2158        // operations despite requiring a suspend.
2159        debug_assert!(
2160            matches!(
2161                reason,
2162                SuspendReason::ExplicitlySuspending {
2163                    skip_may_block_check: true,
2164                    ..
2165                } | SuspendReason::Waiting {
2166                    skip_may_block_check: true,
2167                    ..
2168                } | SuspendReason::Yielding {
2169                    skip_may_block_check: true,
2170                    ..
2171                }
2172            ) || old_guest_thread
2173                .guest()
2174                .map(|thread| self.concurrent_state_mut()?.may_block(thread.task))
2175                .transpose()?
2176                .unwrap_or(true)
2177        );
2178
2179        let suspend_reason = &mut self.concurrent_state_mut()?.suspend_reason;
2180        assert!(suspend_reason.is_none());
2181        *suspend_reason = Some(reason);
2182
2183        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2184
2185        if task.is_some() {
2186            self.set_thread(old_guest_thread)?;
2187        }
2188
2189        Ok(())
2190    }
2191
2192    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2193        let caller = self.current_guest_thread()?;
2194        let state = self.concurrent_state_mut()?;
2195
2196        if waitable.common(state)?.set.is_some() {
2197            bail!(Trap::WaitableSyncAndAsync);
2198        }
2199
2200        let set = state.get_mut(caller.thread)?.sync_call_set;
2201        waitable.join(state, Some(set))?;
2202        self.suspend(SuspendReason::Waiting {
2203            set,
2204            thread: caller,
2205            skip_may_block_check: false,
2206        })?;
2207        let state = self.concurrent_state_mut()?;
2208        waitable.join(state, None)
2209    }
2210
2211    /// Cleans up the data structures backing the `guest_thread` specified,
2212    /// removing it from the internal tables of `runtime_instance` as well.
2213    ///
2214    /// This function is used whenever a guest thread has fully exited and
2215    /// completed. This'll clean up the associated `GuestThread` structure and
2216    /// related resources it contains.
2217    ///
2218    /// Other functionality that this implements is:
2219    ///
2220    /// * This will perform conditional cleanup of the `GuestTask` that owns
2221    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2222    /// * If there are no more threads in the `GuestTask` that this thread is
2223    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2224    ///   returned or cancelled), then a trap will be raised that a result
2225    ///   wasn't ever produced.
2226    /// * If this task is finished, meaning the top-level thread exited and
2227    ///   additionally it's been returned or cancelled, then this will handle
2228    ///   management of the store's "active interesting tasks" counter.
2229    ///
2230    /// Effectively this is intended to be a "narrow waist" through which many
2231    /// destruction operations are funneled through.
2232    fn cleanup_thread(
2233        &mut self,
2234        guest_thread: QualifiedThreadId,
2235        runtime_instance: RuntimeInstance,
2236        cleanup_task: CleanupTask,
2237    ) -> Result<()> {
2238        let state = self.concurrent_state_mut()?;
2239        let thread_data = state.get_mut(guest_thread.thread)?;
2240        let sync_call_set = thread_data.sync_call_set;
2241        if let Some(guest_id) = thread_data.instance_rep {
2242            self.instance_state(runtime_instance)
2243                .thread_handle_table()
2244                .guest_thread_remove(guest_id)?;
2245        }
2246        let state = self.concurrent_state_mut()?;
2247
2248        // Clean up any pending subtasks in the sync_call_set
2249        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2250            if let Some(Event::Subtask {
2251                status: Status::Returned | Status::ReturnCancelled,
2252            }) = waitable.common(state)?.event
2253            {
2254                waitable.delete_from(state)?;
2255            }
2256        }
2257
2258        state.delete(guest_thread.thread)?;
2259        state.delete(sync_call_set)?;
2260        let task = state.get_mut(guest_thread.task)?;
2261        task.threads.remove(&guest_thread.thread);
2262
2263        if task.threads.is_empty() && !task.returned_or_cancelled() {
2264            bail!(Trap::NoAsyncResult);
2265        }
2266        let ready_to_delete = task.ready_to_delete();
2267
2268        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2269            task.decremented_interesting_task_count = true;
2270
2271            debug_assert!(state.interesting_tasks > 0);
2272            state.interesting_tasks -= 1;
2273            if state.interesting_tasks == 0
2274                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2275            {
2276                waker.wake();
2277            }
2278        }
2279
2280        match cleanup_task {
2281            CleanupTask::Yes => {
2282                if ready_to_delete {
2283                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2284                }
2285            }
2286            CleanupTask::No => {}
2287        }
2288
2289        Ok(())
2290    }
2291
2292    /// Performs cancellation of the `guest_task` specified with the
2293    /// precondition that the task hasn't lowered its parameters.
2294    ///
2295    /// In this situation the task hasn't ever been started meaning it hasn't
2296    /// actually run any wasm code yet. This requires cleaning up metadata such
2297    /// as thread information attached to the task.
2298    ///
2299    /// The main two entrypoints for this function are:
2300    ///
2301    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2302    /// * Dropping a host `call_async` future which needs to cancel the task
2303    ///   because it cannot reference its parameters any more.
2304    fn cancel_guest_subtask_without_lowered_parameters(
2305        &mut self,
2306        caller_instance: RuntimeInstance,
2307        guest_task: TableId<GuestTask>,
2308    ) -> Result<()> {
2309        let concurrent_state = self.concurrent_state_mut()?;
2310        let task = concurrent_state.get_mut(guest_task)?;
2311        assert!(!task.already_lowered_parameters());
2312        // The task is in a `starting` state, meaning it hasn't run at
2313        // all yet.  Here we update its fields to indicate that it is
2314        // ready to delete immediately once `subtask.drop` is called.
2315        task.lower_params = None;
2316        task.lift_result = None;
2317        task.exited = true;
2318        let instance = task.instance;
2319
2320        // Clean up the thread within this task as it's now never going
2321        // to run.
2322        assert_eq!(1, task.threads.len());
2323        let thread = *task.threads.iter().next().unwrap();
2324        self.cleanup_thread(
2325            QualifiedThreadId {
2326                task: guest_task,
2327                thread,
2328            },
2329            caller_instance,
2330            CleanupTask::No,
2331        )?;
2332
2333        // Not yet started; cancel and remove from pending
2334        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2335        let pending_count = pending.len();
2336        pending.retain(|thread, _| thread.task != guest_task);
2337        // If there were no pending threads for this task, we're in an error state
2338        if pending.len() == pending_count {
2339            bail!(Trap::SubtaskCancelAfterTerminal);
2340        }
2341        Ok(())
2342    }
2343
2344    /// Used by `ResourceTables` to record the scope of a borrow to get undone
2345    /// in the future.
2346    pub(crate) fn current_scope_id(&mut self) -> Result<Option<u32>> {
2347        if !self.concurrency_support() {
2348            return self.current_scope_id_not_concurrent();
2349        }
2350        let (bits, is_host) = match self.current_thread()? {
2351            CurrentThread::Guest(id) => (id.task.rep(), false),
2352            CurrentThread::Host(id) => (id.rep(), true),
2353            CurrentThread::None => return Ok(None),
2354        };
2355        assert_eq!((bits << 1) >> 1, bits);
2356        Ok(Some((bits << 1) | u32::from(is_host)))
2357    }
2358}
2359
2360enum CleanupTask {
2361    Yes,
2362    No,
2363}
2364
2365impl Instance {
2366    /// Get the next pending event for the specified task and (optional)
2367    /// waitable set, along with the waitable handle if applicable.
2368    fn get_event(
2369        self,
2370        store: &mut StoreOpaque,
2371        guest_task: TableId<GuestTask>,
2372        set: Option<TableId<WaitableSet>>,
2373        cancellable: bool,
2374    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2375        let state = store.concurrent_state_mut()?;
2376
2377        let event = &mut state.get_mut(guest_task)?.event;
2378        if let Some(ev) = event
2379            && (cancellable || !matches!(ev, Event::Cancelled))
2380        {
2381            log::trace!("deliver event {ev:?} to {guest_task:?}");
2382            let ev = *ev;
2383            *event = None;
2384            return Ok(Some((ev, None)));
2385        }
2386
2387        let set = match set {
2388            Some(set) => set,
2389            None => return Ok(None),
2390        };
2391        let waitable = match state.get_mut(set)?.ready.pop_first() {
2392            Some(v) => v,
2393            None => return Ok(None),
2394        };
2395
2396        let common = waitable.common(state)?;
2397        let handle = match common.handle {
2398            Some(h) => h,
2399            None => bail_bug!("handle not set when delivering event"),
2400        };
2401        let event = match common.event.take() {
2402            Some(e) => e,
2403            None => bail_bug!("event not set when delivering event"),
2404        };
2405
2406        log::trace!(
2407            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2408        );
2409
2410        waitable.on_delivery(store, self, event)?;
2411
2412        Ok(Some((event, Some((waitable, handle)))))
2413    }
2414
2415    /// Handle the `CallbackCode` returned from an async-lifted export or its
2416    /// callback.
2417    ///
2418    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2419    /// using `handle_guest_call`.
2420    fn handle_callback_code(
2421        self,
2422        store: &mut StoreOpaque,
2423        guest_thread: QualifiedThreadId,
2424        runtime_instance: RuntimeComponentInstanceIndex,
2425        code: u32,
2426    ) -> Result<Option<GuestCall>> {
2427        let (code, set) = unpack_callback_code(code);
2428
2429        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2430
2431        let state = store.concurrent_state_mut()?;
2432
2433        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2434            let set = store
2435                .instance_state(self.runtime_instance(runtime_instance))
2436                .handle_table()
2437                .waitable_set_rep(handle)?;
2438
2439            Ok(TableId::<WaitableSet>::new(set))
2440        };
2441
2442        Ok(match code {
2443            callback_code::EXIT => {
2444                log::trace!("implicit thread {guest_thread:?} completed");
2445                let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2446                task.exited = true;
2447                task.callback = None;
2448                store.cleanup_thread(
2449                    guest_thread,
2450                    self.runtime_instance(runtime_instance),
2451                    CleanupTask::Yes,
2452                )?;
2453                None
2454            }
2455            callback_code::YIELD => {
2456                let task = state.get_mut(guest_thread.task)?;
2457                // If an `Event::Cancelled` is pending, we'll deliver that;
2458                // otherwise, we'll deliver `Event::None`.  Note that
2459                // `GuestTask::event` is only ever set to one of those two
2460                // `Event` variants.
2461                if let Some(event) = task.event {
2462                    assert!(matches!(event, Event::None | Event::Cancelled));
2463                } else {
2464                    task.event = Some(Event::None);
2465                }
2466                let call = GuestCall {
2467                    thread: guest_thread,
2468                    kind: GuestCallKind::DeliverEvent {
2469                        instance: self,
2470                        set: None,
2471                    },
2472                };
2473                if state.may_block(guest_thread.task)? {
2474                    // Push this thread onto the "low priority" queue so it runs
2475                    // after any other threads have had a chance to run.
2476                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2477                    None
2478                } else {
2479                    // Yielding in a non-blocking context is defined as a no-op
2480                    // according to the spec, so we must run this thread
2481                    // immediately without allowing any others to run.
2482                    Some(call)
2483                }
2484            }
2485            callback_code::WAIT => {
2486                // The task may only return `WAIT` if it was created for a call
2487                // to an async export).  Otherwise, we'll trap.
2488                state.check_blocking_for(guest_thread.task)?;
2489
2490                let set = get_set(store, set)?;
2491                let state = store.concurrent_state_mut()?;
2492
2493                if state.get_mut(guest_thread.task)?.event.is_some()
2494                    || !state.get_mut(set)?.ready.is_empty()
2495                {
2496                    // An event is immediately available; deliver it ASAP.
2497                    state.push_high_priority(WorkItem::GuestCall(
2498                        runtime_instance,
2499                        GuestCall {
2500                            thread: guest_thread,
2501                            kind: GuestCallKind::DeliverEvent {
2502                                instance: self,
2503                                set: Some(set),
2504                            },
2505                        },
2506                    ));
2507                } else {
2508                    // No event is immediately available.
2509                    //
2510                    // We're waiting, so register to be woken up when an event
2511                    // is published for this waitable set.
2512                    //
2513                    // Here we also set `GuestTask::wake_on_cancel` which allows
2514                    // `subtask.cancel` to interrupt the wait.
2515                    let old = state
2516                        .get_mut(guest_thread.thread)?
2517                        .wake_on_cancel
2518                        .replace(set);
2519                    if !old.is_none() {
2520                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2521                    }
2522                    let old = state
2523                        .get_mut(set)?
2524                        .waiting
2525                        .insert(guest_thread, WaitMode::Callback(self));
2526                    if !old.is_none() {
2527                        bail_bug!("set's waiting set already had this thread registered");
2528                    }
2529                }
2530                None
2531            }
2532            _ => bail!(Trap::UnsupportedCallbackCode),
2533        })
2534    }
2535
2536    /// Add the specified guest call to the "high priority" work item queue, to
2537    /// be started as soon as backpressure and/or reentrance rules allow.
2538    ///
2539    /// SAFETY: The raw pointer arguments must be valid references to guest
2540    /// functions (with the appropriate signatures) when the closures queued by
2541    /// this function are called.
2542    unsafe fn queue_call<T: 'static>(
2543        self,
2544        mut store: StoreContextMut<T>,
2545        guest_thread: QualifiedThreadId,
2546        callee: SendSyncPtr<VMFuncRef>,
2547        param_count: usize,
2548        result_count: usize,
2549        async_: bool,
2550        callback: Option<SendSyncPtr<VMFuncRef>>,
2551        post_return: Option<SendSyncPtr<VMFuncRef>>,
2552    ) -> Result<()> {
2553        /// Return a closure which will call the specified function in the scope
2554        /// of the specified task.
2555        ///
2556        /// This will use `GuestTask::lower_params` to lower the parameters, but
2557        /// will not lift the result; instead, it returns a
2558        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2559        /// any, may be lifted.  Note that an async-lifted export will have
2560        /// returned its result using the `task.return` intrinsic (or not
2561        /// returned a result at all, in the case of `task.cancel`), in which
2562        /// case the "result" of this call will either be a callback code or
2563        /// nothing.
2564        ///
2565        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2566        /// the returned closure is called.
2567        unsafe fn make_call<T: 'static>(
2568            store: StoreContextMut<T>,
2569            guest_thread: QualifiedThreadId,
2570            callee: SendSyncPtr<VMFuncRef>,
2571            param_count: usize,
2572            result_count: usize,
2573        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2574        + Send
2575        + Sync
2576        + 'static
2577        + use<T> {
2578            let token = StoreToken::new(store);
2579            move |store: &mut dyn VMStore| {
2580                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2581
2582                store
2583                    .concurrent_state_mut()?
2584                    .get_mut(guest_thread.thread)?
2585                    .state = GuestThreadState::Running;
2586                let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2587                let lower = match task.lower_params.take() {
2588                    Some(l) => l,
2589                    None => bail_bug!("lower_params missing"),
2590                };
2591
2592                lower(store, &mut storage[..param_count])?;
2593
2594                let mut store = token.as_context_mut(store);
2595
2596                // SAFETY: Per the contract documented in `make_call's`
2597                // documentation, `callee` must be a valid pointer.
2598                unsafe {
2599                    crate::Func::call_unchecked_raw(
2600                        &mut store,
2601                        callee.as_non_null(),
2602                        NonNull::new(
2603                            &mut storage[..param_count.max(result_count)]
2604                                as *mut [MaybeUninit<ValRaw>] as _,
2605                        )
2606                        .unwrap(),
2607                    )?;
2608                }
2609
2610                Ok(storage)
2611            }
2612        }
2613
2614        // SAFETY: Per the contract described in this function documentation,
2615        // the `callee` pointer which `call` closes over must be valid when
2616        // called by the closure we queue below.
2617        let call = unsafe {
2618            make_call(
2619                store.as_context_mut(),
2620                guest_thread,
2621                callee,
2622                param_count,
2623                result_count,
2624            )
2625        };
2626
2627        let callee_instance = store
2628            .0
2629            .concurrent_state_mut()?
2630            .get_mut(guest_thread.task)?
2631            .instance;
2632
2633        let fun = if callback.is_some() {
2634            assert!(async_);
2635
2636            Box::new(move |store: &mut dyn VMStore| {
2637                self.add_guest_thread_to_instance_table(
2638                    guest_thread.thread,
2639                    store,
2640                    callee_instance.index,
2641                )?;
2642                let old_thread = store.set_thread(guest_thread)?;
2643                log::trace!(
2644                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2645                );
2646
2647                store.enter_instance(callee_instance);
2648
2649                // SAFETY: See the documentation for `make_call` to review the
2650                // contract we must uphold for `call` here.
2651                //
2652                // Per the contract described in the `queue_call`
2653                // documentation, the `callee` pointer which `call` closes
2654                // over must be valid.
2655                let storage = call(store)?;
2656
2657                store.exit_instance(callee_instance)?;
2658
2659                store.set_thread(old_thread)?;
2660                let state = store.concurrent_state_mut()?;
2661                if let Some(t) = old_thread.guest() {
2662                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2663                }
2664                log::trace!("stackless call: restored {old_thread:?} as current thread");
2665
2666                // SAFETY: `wasmparser` will have validated that the callback
2667                // function returns a `i32` result.
2668                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2669
2670                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2671            })
2672                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2673        } else {
2674            let token = StoreToken::new(store.as_context_mut());
2675            Box::new(move |store: &mut dyn VMStore| {
2676                self.add_guest_thread_to_instance_table(
2677                    guest_thread.thread,
2678                    store,
2679                    callee_instance.index,
2680                )?;
2681                let old_thread = store.set_thread(guest_thread)?;
2682                log::trace!(
2683                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2684                );
2685                let flags = self.id().get(store).instance_flags(callee_instance.index);
2686
2687                // Unless this is a callback-less (i.e. stackful)
2688                // async-lifted export, we need to record that the instance
2689                // cannot be entered until the call returns.
2690                if !async_ {
2691                    store.enter_instance(callee_instance);
2692                }
2693
2694                // SAFETY: See the documentation for `make_call` to review the
2695                // contract we must uphold for `call` here.
2696                //
2697                // Per the contract described in the `queue_call`
2698                // documentation, the `callee` pointer which `call` closes
2699                // over must be valid.
2700                let storage = call(store)?;
2701
2702                if !async_ {
2703                    // This is a sync-lifted export, so now is when we lift the
2704                    // result, optionally call the post-return function, if any,
2705                    // and finally notify any current or future waiters that the
2706                    // subtask has returned.
2707
2708                    let lift = {
2709                        store.exit_instance(callee_instance)?;
2710
2711                        let state = store.concurrent_state_mut()?;
2712                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2713                            bail_bug!("task has already produced a result");
2714                        }
2715
2716                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2717                            Some(lift) => lift,
2718                            None => bail_bug!("lift_result field is missing"),
2719                        }
2720                    };
2721
2722                    // SAFETY: `result_count` represents the number of core Wasm
2723                    // results returned, per `wasmparser`.
2724                    let result = (lift.lift)(store, unsafe {
2725                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2726                            &storage[..result_count],
2727                        )
2728                    })?;
2729
2730                    let post_return_arg = match result_count {
2731                        0 => ValRaw::i32(0),
2732                        // SAFETY: `result_count` represents the number of
2733                        // core Wasm results returned, per `wasmparser`.
2734                        1 => unsafe { storage[0].assume_init() },
2735                        _ => unreachable!(),
2736                    };
2737
2738                    unsafe {
2739                        call_post_return(
2740                            token.as_context_mut(store),
2741                            post_return.map(|v| v.as_non_null()),
2742                            post_return_arg,
2743                            flags,
2744                        )?;
2745                    }
2746
2747                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2748                }
2749
2750                store.set_thread(old_thread)?;
2751
2752                store
2753                    .concurrent_state_mut()?
2754                    .get_mut(guest_thread.task)?
2755                    .exited = true;
2756
2757                // This is a callback-less call, so the implicit thread has now completed
2758                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2759                Ok(None)
2760            })
2761        };
2762
2763        store
2764            .0
2765            .concurrent_state_mut()?
2766            .push_high_priority(WorkItem::GuestCall(
2767                callee_instance.index,
2768                GuestCall {
2769                    thread: guest_thread,
2770                    kind: GuestCallKind::StartImplicit(fun),
2771                },
2772            ));
2773
2774        Ok(())
2775    }
2776
2777    /// Prepare (but do not start) a guest->guest call.
2778    ///
2779    /// This is called from fused adapter code generated in
2780    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2781    /// are synthesized Wasm functions which move the parameters from the caller
2782    /// to the callee and the result from the callee to the caller,
2783    /// respectively.  The adapter will call `Self::start_call` immediately
2784    /// after calling this function.
2785    ///
2786    /// SAFETY: All the pointer arguments must be valid pointers to guest
2787    /// entities (and with the expected signatures for the function references
2788    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2789    unsafe fn prepare_call<T: 'static>(
2790        self,
2791        mut store: StoreContextMut<T>,
2792        start: NonNull<VMFuncRef>,
2793        return_: NonNull<VMFuncRef>,
2794        caller_instance: RuntimeComponentInstanceIndex,
2795        callee_instance: RuntimeComponentInstanceIndex,
2796        task_return_type: TypeTupleIndex,
2797        callee_async: bool,
2798        memory: *mut VMMemoryDefinition,
2799        string_encoding: StringEncoding,
2800        caller_info: CallerInfo,
2801    ) -> Result<()> {
2802        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2803            // A task may only call an async-typed function via a sync lower if
2804            // it was created by a call to an async export.  Otherwise, we'll
2805            // trap.
2806            store.0.check_blocking()?;
2807        }
2808
2809        enum ResultInfo {
2810            Heap { results: u32 },
2811            Stack { result_count: u32 },
2812        }
2813
2814        let result_info = match &caller_info {
2815            CallerInfo::Async {
2816                has_result: true,
2817                params,
2818            } => ResultInfo::Heap {
2819                results: match params.last() {
2820                    Some(r) => r.get_u32(),
2821                    None => bail_bug!("retptr missing"),
2822                },
2823            },
2824            CallerInfo::Async {
2825                has_result: false, ..
2826            } => ResultInfo::Stack { result_count: 0 },
2827            CallerInfo::Sync {
2828                result_count,
2829                params,
2830            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2831                results: match params.last() {
2832                    Some(r) => r.get_u32(),
2833                    None => bail_bug!("arg ptr missing"),
2834                },
2835            },
2836            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2837                result_count: *result_count,
2838            },
2839        };
2840
2841        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2842
2843        // Create a new guest task for the call, closing over the `start` and
2844        // `return_` functions to lift the parameters and lower the result,
2845        // respectively.
2846        let start = SendSyncPtr::new(start);
2847        let return_ = SendSyncPtr::new(return_);
2848        let token = StoreToken::new(store.as_context_mut());
2849        let old_thread = store.0.current_guest_thread()?;
2850        let state = store.0.concurrent_state_mut()?;
2851
2852        debug_assert_eq!(
2853            state.get_mut(old_thread.task)?.instance,
2854            self.runtime_instance(caller_instance)
2855        );
2856
2857        let guest_thread = GuestTask::new(
2858            state,
2859            Box::new(move |store, dst| {
2860                let mut store = token.as_context_mut(store);
2861                assert!(dst.len() <= MAX_FLAT_PARAMS);
2862                // The `+ 1` here accounts for the return pointer, if any:
2863                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2864                let count = match caller_info {
2865                    // Async callers, if they have a result, use the last
2866                    // parameter as a return pointer so chop that off if
2867                    // relevant here.
2868                    CallerInfo::Async { params, has_result } => {
2869                        let params = &params[..params.len() - usize::from(has_result)];
2870                        for (param, src) in params.iter().zip(&mut src) {
2871                            src.write(*param);
2872                        }
2873                        params.len()
2874                    }
2875
2876                    // Sync callers forward everything directly.
2877                    CallerInfo::Sync { params, .. } => {
2878                        for (param, src) in params.iter().zip(&mut src) {
2879                            src.write(*param);
2880                        }
2881                        params.len()
2882                    }
2883                };
2884                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2885                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2886                // how it was constructed (see
2887                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2888                // for details) we know it takes count parameters and returns
2889                // `dst.len()` results.
2890                unsafe {
2891                    crate::Func::call_unchecked_raw(
2892                        &mut store,
2893                        start.as_non_null(),
2894                        NonNull::new(
2895                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2896                        )
2897                        .unwrap(),
2898                    )?;
2899                }
2900                dst.copy_from_slice(&src[..dst.len()]);
2901                let task = store.0.current_guest_thread()?.task;
2902                let state = store.0.concurrent_state_mut()?;
2903                Waitable::Guest(task).set_event(
2904                    state,
2905                    Some(Event::Subtask {
2906                        status: Status::Started,
2907                    }),
2908                )?;
2909                Ok(())
2910            }),
2911            LiftResult {
2912                lift: Box::new(move |store, src| {
2913                    // SAFETY: See comment in closure passed as `lower_params`
2914                    // parameter above.
2915                    let mut store = token.as_context_mut(store);
2916                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2917                    if let ResultInfo::Heap { results } = &result_info {
2918                        my_src.push(ValRaw::u32(*results));
2919                    }
2920
2921                    // Execute the `return_` hook, generated by Wasmtime's FACT
2922                    // compiler, in the context of the old thread. The old
2923                    // thread, this thread's caller, may have `realloc`
2924                    // callbacks invoked for example and those need the correct
2925                    // context set for the current thread.
2926                    let prev = store.0.set_thread(old_thread)?;
2927
2928                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2929                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2930                    // on how it was constructed (see
2931                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2932                    // for details) we know it takes `src.len()` parameters and
2933                    // returns up to 1 result.
2934                    unsafe {
2935                        crate::Func::call_unchecked_raw(
2936                            &mut store,
2937                            return_.as_non_null(),
2938                            my_src.as_mut_slice().into(),
2939                        )?;
2940                    }
2941
2942                    // Restore the previous current thread after the
2943                    // lifting/lowering has returned.
2944                    store.0.set_thread(prev)?;
2945
2946                    let thread = store.0.current_guest_thread()?;
2947                    let state = store.0.concurrent_state_mut()?;
2948                    if sync_caller {
2949                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2950                            if let ResultInfo::Stack { result_count } = &result_info {
2951                                match result_count {
2952                                    0 => None,
2953                                    1 => Some(my_src[0]),
2954                                    _ => unreachable!(),
2955                                }
2956                            } else {
2957                                None
2958                            },
2959                        );
2960                    }
2961                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2962                }),
2963                ty: task_return_type,
2964                memory: NonNull::new(memory).map(SendSyncPtr::new),
2965                string_encoding,
2966            },
2967            Caller::Guest { thread: old_thread },
2968            None,
2969            self.runtime_instance(callee_instance),
2970            callee_async,
2971        )?;
2972
2973        // Make the new thread the current one so that `Self::start_call` knows
2974        // which one to start.
2975        store.0.set_thread(guest_thread)?;
2976        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2977
2978        Ok(())
2979    }
2980
2981    /// Call the specified callback function for an async-lifted export.
2982    ///
2983    /// SAFETY: `function` must be a valid reference to a guest function of the
2984    /// correct signature for a callback.
2985    unsafe fn call_callback<T>(
2986        self,
2987        mut store: StoreContextMut<T>,
2988        function: SendSyncPtr<VMFuncRef>,
2989        event: Event,
2990        handle: u32,
2991    ) -> Result<u32> {
2992        let (ordinal, result) = event.parts();
2993        let params = &mut [
2994            ValRaw::u32(ordinal),
2995            ValRaw::u32(handle),
2996            ValRaw::u32(result),
2997        ];
2998        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2999        // `wasmtime-cranelift`-generated fused adapter code or
3000        // `component::Options`.  Per `wasmparser` callback signature
3001        // validation, we know it takes three parameters and returns one.
3002        unsafe {
3003            crate::Func::call_unchecked_raw(
3004                &mut store,
3005                function.as_non_null(),
3006                params.as_mut_slice().into(),
3007            )?;
3008        }
3009        Ok(params[0].get_u32())
3010    }
3011
3012    /// Start a guest->guest call previously prepared using
3013    /// `Self::prepare_call`.
3014    ///
3015    /// This is called from fused adapter code generated in
3016    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
3017    /// this function immediately after calling `Self::prepare_call`.
3018    ///
3019    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
3020    /// functions with the appropriate signatures for the current guest task.
3021    /// If this is a call to an async-lowered import, the actual call may be
3022    /// deferred and run after this function returns, in which case the pointer
3023    /// arguments must also be valid when the call happens.
3024    unsafe fn start_call<T: 'static>(
3025        self,
3026        mut store: StoreContextMut<T>,
3027        callback: *mut VMFuncRef,
3028        post_return: *mut VMFuncRef,
3029        callee: NonNull<VMFuncRef>,
3030        param_count: u32,
3031        result_count: u32,
3032        flags: u32,
3033        storage: Option<&mut [MaybeUninit<ValRaw>]>,
3034    ) -> Result<u32> {
3035        let token = StoreToken::new(store.as_context_mut());
3036        let async_caller = storage.is_none();
3037        let guest_thread = store.0.current_guest_thread()?;
3038        let state = store.0.concurrent_state_mut()?;
3039        let callee_async = state.get_mut(guest_thread.task)?.async_function;
3040        let callee = SendSyncPtr::new(callee);
3041        let param_count = usize::try_from(param_count)?;
3042        assert!(param_count <= MAX_FLAT_PARAMS);
3043        let result_count = usize::try_from(result_count)?;
3044        assert!(result_count <= MAX_FLAT_RESULTS);
3045
3046        let task = state.get_mut(guest_thread.task)?;
3047        if let Some(callback) = NonNull::new(callback) {
3048            // We're calling an async-lifted export with a callback, so store
3049            // the callback and related context as part of the task so we can
3050            // call it later when needed.
3051            let callback = SendSyncPtr::new(callback);
3052            task.callback = Some(Box::new(move |store, event, handle| {
3053                let store = token.as_context_mut(store);
3054                unsafe { self.call_callback::<T>(store, callback, event, handle) }
3055            }));
3056        }
3057
3058        let Caller::Guest { thread: caller } = &task.caller else {
3059            // As of this writing, `start_call` is only used for guest->guest
3060            // calls.
3061            bail_bug!("start_call unexpectedly invoked for host->guest call");
3062        };
3063        let caller = *caller;
3064        let caller_instance = state.get_mut(caller.task)?.instance;
3065
3066        // Queue the call as a "high priority" work item.
3067        unsafe {
3068            self.queue_call(
3069                store.as_context_mut(),
3070                guest_thread,
3071                callee,
3072                param_count,
3073                result_count,
3074                (flags & START_FLAG_ASYNC_CALLEE) != 0,
3075                NonNull::new(callback).map(SendSyncPtr::new),
3076                NonNull::new(post_return).map(SendSyncPtr::new),
3077            )?;
3078        }
3079
3080        let state = store.0.concurrent_state_mut()?;
3081
3082        // Use the caller's `GuestThread::sync_call_set` to register interest in
3083        // the subtask...
3084        let guest_waitable = Waitable::Guest(guest_thread.task);
3085        let old_set = guest_waitable.common(state)?.set;
3086        let set = state.get_mut(caller.thread)?.sync_call_set;
3087        guest_waitable.join(state, Some(set))?;
3088
3089        store.0.set_thread(CurrentThread::None)?;
3090
3091        // ... and suspend this fiber temporarily while we wait for it to start.
3092        //
3093        // Note that we _could_ call the callee directly using the current fiber
3094        // rather than suspend this one, but that would make reasoning about the
3095        // event loop more complicated and is probably only worth doing if
3096        // there's a measurable performance benefit.  In addition, it would mean
3097        // blocking the caller if the callee calls a blocking sync-lowered
3098        // import, and as of this writing the spec says we must not do that.
3099        //
3100        // Alternatively, the fused adapter code could be modified to call the
3101        // callee directly without calling a host-provided intrinsic at all (in
3102        // which case it would need to do its own, inline backpressure checks,
3103        // etc.).  Again, we'd want to see a measurable performance benefit
3104        // before committing to such an optimization.  And again, we'd need to
3105        // update the spec to allow that.
3106        let (status, waitable) = loop {
3107            store.0.suspend(SuspendReason::Waiting {
3108                set,
3109                thread: caller,
3110                // Normally, `StoreOpaque::suspend` would assert it's being
3111                // called from a context where blocking is allowed.  However, if
3112                // `async_caller` is `true`, we'll only "block" long enough for
3113                // the callee to start, i.e. we won't repeat this loop, so we
3114                // tell `suspend` it's okay even if we're not allowed to block.
3115                // Alternatively, if the callee is not an async function, then
3116                // we know it won't block anyway.
3117                skip_may_block_check: async_caller || !callee_async,
3118            })?;
3119
3120            let state = store.0.concurrent_state_mut()?;
3121
3122            log::trace!("taking event for {:?}", guest_thread.task);
3123            let event = guest_waitable.take_event(state)?;
3124            let Some(Event::Subtask { status }) = event else {
3125                bail_bug!("subtasks should only get subtask events, got {event:?}")
3126            };
3127
3128            log::trace!("status {status:?} for {:?}", guest_thread.task);
3129
3130            if status == Status::Returned {
3131                // It returned, so we can stop waiting.
3132                break (status, None);
3133            } else if async_caller {
3134                // It hasn't returned yet, but the caller is calling via an
3135                // async-lowered import, so we generate a handle for the task
3136                // waitable and return the status.
3137                let handle = store
3138                    .0
3139                    .instance_state(caller_instance)
3140                    .handle_table()
3141                    .subtask_insert_guest(guest_thread.task.rep())?;
3142                store
3143                    .0
3144                    .concurrent_state_mut()?
3145                    .get_mut(guest_thread.task)?
3146                    .common
3147                    .handle = Some(handle);
3148                break (status, Some(handle));
3149            } else {
3150                // The callee hasn't returned yet, and the caller is calling via
3151                // a sync-lowered import, so we loop and keep waiting until the
3152                // callee returns.
3153            }
3154        };
3155
3156        guest_waitable.join(store.0.concurrent_state_mut()?, old_set)?;
3157
3158        // Reset the current thread to point to the caller as it resumes control.
3159        store.0.set_thread(caller)?;
3160        store
3161            .0
3162            .concurrent_state_mut()?
3163            .get_mut(caller.thread)?
3164            .state = GuestThreadState::Running;
3165        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
3166
3167        if let Some(storage) = storage {
3168            // The caller used a sync-lowered import to call an async-lifted
3169            // export, in which case the result, if any, has been stashed in
3170            // `GuestTask::sync_result`.
3171            let state = store.0.concurrent_state_mut()?;
3172            let task = state.get_mut(guest_thread.task)?;
3173            if let Some(result) = task.sync_result.take()? {
3174                if let Some(result) = result {
3175                    storage[0] = MaybeUninit::new(result);
3176                }
3177
3178                if task.exited && task.ready_to_delete() {
3179                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3180                }
3181            }
3182        }
3183
3184        Ok(status.pack(waitable))
3185    }
3186
3187    /// Poll the specified future once on behalf of a guest->host call using an
3188    /// async-lowered import.
3189    ///
3190    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
3191    /// `Pending`, add it to the set of futures to be polled as part of this
3192    /// instance's event loop until it completes, and then return
3193    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
3194    ///
3195    /// Whether the future returns `Ready` immediately or later, the `lower`
3196    /// function will be used to lower the result, if any, into the guest caller's
3197    /// stack and linear memory. The `lower` function is invoked with `None` if
3198    /// the future is cancelled.
3199    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3200        self,
3201        mut store: StoreContextMut<'_, T>,
3202        future: impl Future<Output = Result<R>> + Send + 'static,
3203        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3204    ) -> Result<Option<u32>> {
3205        let token = StoreToken::new(store.as_context_mut());
3206        let task = store.0.current_host_thread()?;
3207        let state = store.0.concurrent_state_mut()?;
3208
3209        // Create an abortable future which hooks calls to poll and manages call
3210        // context state for the future.
3211        let (join_handle, future) = JoinHandle::run(future);
3212        {
3213            let state = &mut state.get_mut(task)?.state;
3214            assert!(matches!(state, HostTaskState::CalleeStarted));
3215            *state = HostTaskState::CalleeRunning(join_handle);
3216        }
3217
3218        let mut future = Box::pin(future);
3219
3220        // Finally, poll the future.  We can use a dummy `Waker` here because
3221        // we'll add the future to `ConcurrentState::futures` and poll it
3222        // automatically from the event loop if it doesn't complete immediately
3223        // here.
3224        let poll = tls::set(store.0, || {
3225            future
3226                .as_mut()
3227                .poll(&mut Context::from_waker(&Waker::noop()))
3228        });
3229
3230        match poll {
3231            // It finished immediately; lower the result and delete the task.
3232            Poll::Ready(result) => {
3233                let result = result.transpose()?;
3234                lower(store.as_context_mut(), result)?;
3235                return Ok(None);
3236            }
3237
3238            // Future isn't ready yet, so fall through.
3239            Poll::Pending => {}
3240        }
3241
3242        // It hasn't finished yet; add the future to
3243        // `ConcurrentState::futures` so it will be polled by the event
3244        // loop and allocate a waitable handle to return to the guest.
3245
3246        // Wrap the future in a closure responsible for lowering the result into
3247        // the guest's stack and memory, as well as notifying any waiters that
3248        // the task returned.
3249        let future = Box::pin(async move {
3250            let result = match future.await {
3251                Some(result) => Some(result?),
3252                None => None,
3253            };
3254            let on_complete = move |store: &mut dyn VMStore| {
3255                // Restore the `current_thread` to be the host so `lower` knows
3256                // how to manipulate borrows and knows which scope of borrows
3257                // to check.
3258                let mut store = token.as_context_mut(store);
3259                let old = store.0.set_thread(task)?;
3260
3261                let status = if result.is_some() {
3262                    Status::Returned
3263                } else {
3264                    Status::ReturnCancelled
3265                };
3266
3267                lower(store.as_context_mut(), result)?;
3268                let state = store.0.concurrent_state_mut()?;
3269                match &mut state.get_mut(task)?.state {
3270                    // The task is already flagged as finished because it was
3271                    // cancelled. No need to transition further.
3272                    HostTaskState::CalleeDone { .. } => {}
3273
3274                    // Otherwise transition this task to the done state.
3275                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3276                }
3277                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3278
3279                store.0.set_thread(old)?;
3280                Ok(())
3281            };
3282
3283            // Here we schedule a task to run on a worker fiber to do the
3284            // lowering since it may involve a call to the guest's realloc
3285            // function. This is necessary because calling the guest while
3286            // there are host embedder frames on the stack is unsound.
3287            tls::get(move |store| {
3288                store
3289                    .concurrent_state_mut()?
3290                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3291                        on_complete,
3292                    ))));
3293                Ok(())
3294            })
3295        });
3296
3297        // Make this task visible to the guest and then record what it
3298        // was made visible as.
3299        let state = store.0.concurrent_state_mut()?;
3300        state.push_future(future);
3301        let caller = state.get_mut(task)?.caller;
3302        let instance = state.get_mut(caller.task)?.instance;
3303        let handle = store
3304            .0
3305            .instance_state(instance)
3306            .handle_table()
3307            .subtask_insert_host(task.rep())?;
3308        store.0.concurrent_state_mut()?.get_mut(task)?.common.handle = Some(handle);
3309        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3310
3311        // Restore the currently running thread to this host task's
3312        // caller. Note that the host task isn't deallocated as it's
3313        // within the store and will get deallocated later.
3314        store.0.set_thread(caller)?;
3315        Ok(Some(handle))
3316    }
3317
3318    /// Implements the `task.return` intrinsic, lifting the result for the
3319    /// current guest task.
3320    pub(crate) fn task_return(
3321        self,
3322        store: &mut dyn VMStore,
3323        ty: TypeTupleIndex,
3324        options: OptionsIndex,
3325        storage: &[ValRaw],
3326    ) -> Result<()> {
3327        let guest_thread = store.current_guest_thread()?;
3328        let state = store.concurrent_state_mut()?;
3329        let lift = state
3330            .get_mut(guest_thread.task)?
3331            .lift_result
3332            .take()
3333            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3334        if !state.get_mut(guest_thread.task)?.result.is_none() {
3335            bail_bug!("task result unexpectedly already set");
3336        }
3337
3338        let CanonicalOptions {
3339            string_encoding,
3340            data_model,
3341            ..
3342        } = &self.id().get(store).component().env_component().options[options];
3343
3344        let invalid = ty != lift.ty
3345            || string_encoding != &lift.string_encoding
3346            || match data_model {
3347                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3348                    Some(memory) => {
3349                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3350                        let actual = self.id().get(store).runtime_memory(memory);
3351                        expected != actual.as_ptr()
3352                    }
3353                    // Memory not specified, meaning it didn't need to be
3354                    // specified per validation, so not invalid.
3355                    None => false,
3356                },
3357                // Always invalid as this isn't supported.
3358                CanonicalOptionsDataModel::Gc { .. } => true,
3359            };
3360
3361        if invalid {
3362            bail!(Trap::TaskReturnInvalid);
3363        }
3364
3365        log::trace!("task.return for {guest_thread:?}");
3366
3367        let result = (lift.lift)(store, storage)?;
3368        self.task_complete(store, guest_thread.task, result, Status::Returned)
3369    }
3370
3371    /// Implements the `task.cancel` intrinsic.
3372    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3373        let guest_thread = store.current_guest_thread()?;
3374        let state = store.concurrent_state_mut()?;
3375        let task = state.get_mut(guest_thread.task)?;
3376        if !task.cancel_sent {
3377            bail!(Trap::TaskCancelNotCancelled);
3378        }
3379        _ = task
3380            .lift_result
3381            .take()
3382            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3383
3384        if !task.result.is_none() {
3385            bail_bug!("task result should not bet set yet");
3386        }
3387
3388        log::trace!("task.cancel for {guest_thread:?}");
3389
3390        self.task_complete(
3391            store,
3392            guest_thread.task,
3393            Box::new(DummyResult),
3394            Status::ReturnCancelled,
3395        )
3396    }
3397
3398    /// Complete the specified guest task (i.e. indicate that it has either
3399    /// returned a (possibly empty) result or cancelled itself).
3400    ///
3401    /// This will return any resource borrows and notify any current or future
3402    /// waiters that the task has completed.
3403    fn task_complete(
3404        self,
3405        store: &mut StoreOpaque,
3406        guest_task: TableId<GuestTask>,
3407        result: Box<dyn Any + Send + Sync>,
3408        status: Status,
3409    ) -> Result<()> {
3410        store
3411            .component_resource_tables(Some(self))?
3412            .validate_scope_exit()?;
3413
3414        let state = store.concurrent_state_mut()?;
3415        let task = state.get_mut(guest_task)?;
3416
3417        if let Caller::Host { tx, .. } = &mut task.caller {
3418            if let Some(tx) = tx.take() {
3419                _ = tx.send(result);
3420            }
3421        } else {
3422            task.result = Some(result);
3423            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3424        }
3425
3426        Ok(())
3427    }
3428
3429    /// Implements the `waitable-set.new` intrinsic.
3430    pub(crate) fn waitable_set_new(
3431        self,
3432        store: &mut StoreOpaque,
3433        caller_instance: RuntimeComponentInstanceIndex,
3434    ) -> Result<u32> {
3435        let set = store.concurrent_state_mut()?.push(WaitableSet::default())?;
3436        let handle = store
3437            .instance_state(self.runtime_instance(caller_instance))
3438            .handle_table()
3439            .waitable_set_insert(set.rep())?;
3440        log::trace!("new waitable set {set:?} (handle {handle})");
3441        Ok(handle)
3442    }
3443
3444    /// Implements the `waitable-set.drop` intrinsic.
3445    pub(crate) fn waitable_set_drop(
3446        self,
3447        store: &mut StoreOpaque,
3448        caller_instance: RuntimeComponentInstanceIndex,
3449        set: u32,
3450    ) -> Result<()> {
3451        let rep = store
3452            .instance_state(self.runtime_instance(caller_instance))
3453            .handle_table()
3454            .waitable_set_remove(set)?;
3455
3456        log::trace!("drop waitable set {rep} (handle {set})");
3457
3458        // Note that we're careful to check for waiters _before_ deleting the
3459        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3460        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3461        if !store
3462            .concurrent_state_mut()?
3463            .get_mut(TableId::<WaitableSet>::new(rep))?
3464            .waiting
3465            .is_empty()
3466        {
3467            bail!(Trap::WaitableSetDropHasWaiters);
3468        }
3469
3470        store
3471            .concurrent_state_mut()?
3472            .delete(TableId::<WaitableSet>::new(rep))?;
3473
3474        Ok(())
3475    }
3476
3477    /// Implements the `waitable.join` intrinsic.
3478    pub(crate) fn waitable_join(
3479        self,
3480        store: &mut StoreOpaque,
3481        caller_instance: RuntimeComponentInstanceIndex,
3482        waitable_handle: u32,
3483        set_handle: u32,
3484    ) -> Result<()> {
3485        let mut instance = self.id().get_mut(store);
3486        let waitable =
3487            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3488
3489        let set = if set_handle == 0 {
3490            None
3491        } else {
3492            let set = instance.instance_states().0[caller_instance]
3493                .handle_table()
3494                .waitable_set_rep(set_handle)?;
3495
3496            let state = store.concurrent_state_mut()?;
3497            if let Some(old) = waitable.common(state)?.set
3498                && state.get_mut(old)?.is_sync_call_set
3499            {
3500                bail!(Trap::WaitableSyncAndAsync);
3501            }
3502
3503            Some(TableId::<WaitableSet>::new(set))
3504        };
3505
3506        log::trace!(
3507            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3508        );
3509
3510        waitable.join(store.concurrent_state_mut()?, set)
3511    }
3512
3513    /// Implements the `subtask.drop` intrinsic.
3514    pub(crate) fn subtask_drop(
3515        self,
3516        store: &mut StoreOpaque,
3517        caller_instance: RuntimeComponentInstanceIndex,
3518        task_id: u32,
3519    ) -> Result<()> {
3520        self.waitable_join(store, caller_instance, task_id, 0)?;
3521
3522        let (rep, is_host) = store
3523            .instance_state(self.runtime_instance(caller_instance))
3524            .handle_table()
3525            .subtask_remove(task_id)?;
3526
3527        let concurrent_state = store.concurrent_state_mut()?;
3528        let (waitable, delete) = if is_host {
3529            let id = TableId::<HostTask>::new(rep);
3530            let task = concurrent_state.get_mut(id)?;
3531            match &task.state {
3532                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3533                HostTaskState::CalleeDone { .. } => {}
3534                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3535                    bail_bug!("invalid state for callee in `subtask.drop`")
3536                }
3537            }
3538            (Waitable::Host(id), true)
3539        } else {
3540            let id = TableId::<GuestTask>::new(rep);
3541            let task = concurrent_state.get_mut(id)?;
3542            if task.lift_result.is_some() {
3543                bail!(Trap::SubtaskDropNotResolved);
3544            }
3545            (
3546                Waitable::Guest(id),
3547                concurrent_state.get_mut(id)?.ready_to_delete(),
3548            )
3549        };
3550
3551        waitable.common(concurrent_state)?.handle = None;
3552
3553        // If this subtask has an event that means that the terminal status of
3554        // this subtask wasn't yet received so it can't be dropped yet.
3555        if waitable.take_event(concurrent_state)?.is_some() {
3556            bail!(Trap::SubtaskDropNotResolved);
3557        }
3558
3559        if delete {
3560            waitable.delete_from(concurrent_state)?;
3561        }
3562
3563        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3564        Ok(())
3565    }
3566
3567    /// Implements the `waitable-set.wait` intrinsic.
3568    pub(crate) fn waitable_set_wait(
3569        self,
3570        store: &mut StoreOpaque,
3571        options: OptionsIndex,
3572        set: u32,
3573        payload: u32,
3574    ) -> Result<u32> {
3575        if !self.options(store, options).async_ {
3576            // The caller may only call `waitable-set.wait` from an async task
3577            // (i.e. a task created via a call to an async export).
3578            // Otherwise, we'll trap.
3579            store.check_blocking()?;
3580        }
3581
3582        let &CanonicalOptions {
3583            cancellable,
3584            instance: caller_instance,
3585            ..
3586        } = &self.id().get(store).component().env_component().options[options];
3587        let rep = store
3588            .instance_state(self.runtime_instance(caller_instance))
3589            .handle_table()
3590            .waitable_set_rep(set)?;
3591
3592        self.waitable_check(
3593            store,
3594            cancellable,
3595            WaitableCheck::Wait,
3596            WaitableCheckParams {
3597                set: TableId::new(rep),
3598                options,
3599                payload,
3600            },
3601        )
3602    }
3603
3604    /// Implements the `waitable-set.poll` intrinsic.
3605    pub(crate) fn waitable_set_poll(
3606        self,
3607        store: &mut StoreOpaque,
3608        options: OptionsIndex,
3609        set: u32,
3610        payload: u32,
3611    ) -> Result<u32> {
3612        let &CanonicalOptions {
3613            cancellable,
3614            instance: caller_instance,
3615            ..
3616        } = &self.id().get(store).component().env_component().options[options];
3617        let rep = store
3618            .instance_state(self.runtime_instance(caller_instance))
3619            .handle_table()
3620            .waitable_set_rep(set)?;
3621
3622        self.waitable_check(
3623            store,
3624            cancellable,
3625            WaitableCheck::Poll,
3626            WaitableCheckParams {
3627                set: TableId::new(rep),
3628                options,
3629                payload,
3630            },
3631        )
3632    }
3633
3634    /// Implements the `thread.index` intrinsic.
3635    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3636        let thread_id = store.current_guest_thread()?.thread;
3637        match store
3638            .concurrent_state_mut()?
3639            .get_mut(thread_id)?
3640            .instance_rep
3641        {
3642            Some(r) => Ok(r),
3643            None => bail_bug!("thread should have instance_rep by now"),
3644        }
3645    }
3646
3647    /// Implements the `thread.new-indirect` intrinsic.
3648    pub(crate) fn thread_new_indirect<T: 'static>(
3649        self,
3650        mut store: StoreContextMut<T>,
3651        runtime_instance: RuntimeComponentInstanceIndex,
3652        _func_ty_idx: TypeFuncIndex, // currently unused
3653        start_func_table_idx: RuntimeTableIndex,
3654        start_func_idx: u32,
3655        context: i32,
3656    ) -> Result<u32> {
3657        log::trace!("creating new thread");
3658
3659        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3660        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3661        let callee = instance
3662            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3663            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3664        if callee.type_index(store.0) != start_func_ty.type_index() {
3665            bail!(Trap::ThreadNewIndirectInvalidType);
3666        }
3667
3668        let token = StoreToken::new(store.as_context_mut());
3669        let start_func = Box::new(
3670            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3671                let old_thread = store.set_thread(guest_thread)?;
3672                log::trace!(
3673                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3674                );
3675
3676                let mut store = token.as_context_mut(store);
3677                let mut params = [ValRaw::i32(context)];
3678                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3679                // on a separate fiber if we're running in an async store.
3680                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3681
3682                store.0.set_thread(old_thread)?;
3683
3684                store.0.cleanup_thread(
3685                    guest_thread,
3686                    self.runtime_instance(runtime_instance),
3687                    CleanupTask::Yes,
3688                )?;
3689                log::trace!("explicit thread {guest_thread:?} completed");
3690                let state = store.0.concurrent_state_mut()?;
3691                if let Some(t) = old_thread.guest() {
3692                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3693                }
3694                log::trace!("thread start: restored {old_thread:?} as current thread");
3695
3696                Ok(())
3697            },
3698        );
3699
3700        let current_thread = store.0.current_guest_thread()?;
3701        let state = store.0.concurrent_state_mut()?;
3702        let parent_task = current_thread.task;
3703
3704        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3705        let thread_id = state.push(new_thread)?;
3706        state.get_mut(parent_task)?.threads.insert(thread_id);
3707
3708        log::trace!("new thread with id {thread_id:?} created");
3709
3710        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3711    }
3712
3713    pub(crate) fn resume_thread(
3714        self,
3715        store: &mut StoreOpaque,
3716        runtime_instance: RuntimeComponentInstanceIndex,
3717        thread_idx: u32,
3718        high_priority: bool,
3719        allow_ready: bool,
3720    ) -> Result<()> {
3721        let thread_id =
3722            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3723        let state = store.concurrent_state_mut()?;
3724        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3725        let thread = state.get_mut(guest_thread.thread)?;
3726
3727        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3728            GuestThreadState::NotStartedExplicit(start_func) => {
3729                log::trace!("starting thread {guest_thread:?}");
3730                let guest_call = WorkItem::GuestCall(
3731                    runtime_instance,
3732                    GuestCall {
3733                        thread: guest_thread,
3734                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3735                            start_func(store, guest_thread)
3736                        })),
3737                    },
3738                );
3739                store
3740                    .concurrent_state_mut()?
3741                    .push_work_item(guest_call, high_priority);
3742            }
3743            GuestThreadState::Suspended(fiber) => {
3744                log::trace!("resuming thread {thread_id:?} that was suspended");
3745                store
3746                    .concurrent_state_mut()?
3747                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3748            }
3749            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3750                log::trace!("resuming thread {thread_id:?} that was ready");
3751                thread.state = GuestThreadState::Ready { fiber, cancellable };
3752                store
3753                    .concurrent_state_mut()?
3754                    .promote_thread_work_item(guest_thread);
3755            }
3756            other => {
3757                thread.state = other;
3758                bail!(Trap::CannotResumeThread);
3759            }
3760        }
3761        Ok(())
3762    }
3763
3764    fn add_guest_thread_to_instance_table(
3765        self,
3766        thread_id: TableId<GuestThread>,
3767        store: &mut StoreOpaque,
3768        runtime_instance: RuntimeComponentInstanceIndex,
3769    ) -> Result<u32> {
3770        let guest_id = store
3771            .instance_state(self.runtime_instance(runtime_instance))
3772            .thread_handle_table()
3773            .guest_thread_insert(thread_id.rep())?;
3774        store
3775            .concurrent_state_mut()?
3776            .get_mut(thread_id)?
3777            .instance_rep = Some(guest_id);
3778        Ok(guest_id)
3779    }
3780
3781    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3782    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3783    pub(crate) fn suspension_intrinsic(
3784        self,
3785        store: &mut StoreOpaque,
3786        caller: RuntimeComponentInstanceIndex,
3787        cancellable: bool,
3788        yielding: bool,
3789        to_thread: SuspensionTarget,
3790    ) -> Result<WaitResult> {
3791        let guest_thread = store.current_guest_thread()?;
3792        if to_thread.is_none() {
3793            let state = store.concurrent_state_mut()?;
3794            if yielding {
3795                // This is a `thread.yield` call
3796                if !state.may_block(guest_thread.task)? {
3797                    // In a non-blocking context, a `thread.yield` may trigger
3798                    // other threads in the same component instance to run.
3799                    if !state.promote_instance_local_thread_work_item(caller) {
3800                        // No other threads are runnable, so just return
3801                        return Ok(WaitResult::Completed);
3802                    }
3803                }
3804            } else {
3805                // The caller may only call `thread.suspend` from an async task
3806                // (i.e. a task created via a call to an async export).
3807                // Otherwise, we'll trap.
3808                store.check_blocking()?;
3809            }
3810        }
3811
3812        // There could be a pending cancellation from a previous uncancellable wait
3813        if cancellable && store.take_pending_cancellation()? {
3814            return Ok(WaitResult::Cancelled);
3815        }
3816
3817        match to_thread {
3818            SuspensionTarget::SomeSuspended(thread) => {
3819                self.resume_thread(store, caller, thread, true, false)?
3820            }
3821            SuspensionTarget::Some(thread) => {
3822                self.resume_thread(store, caller, thread, true, true)?
3823            }
3824            SuspensionTarget::None => { /* nothing to do */ }
3825        }
3826
3827        let reason = if yielding {
3828            SuspendReason::Yielding {
3829                thread: guest_thread,
3830                cancellable,
3831                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3832                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3833                // panic if we called it in a non-blocking context.
3834                skip_may_block_check: to_thread.is_some(),
3835            }
3836        } else {
3837            SuspendReason::ExplicitlySuspending {
3838                thread: guest_thread,
3839                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3840                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3841                // panic if we called it in a non-blocking context.
3842                skip_may_block_check: to_thread.is_some(),
3843            }
3844        };
3845
3846        store.suspend(reason)?;
3847
3848        if cancellable && store.take_pending_cancellation()? {
3849            Ok(WaitResult::Cancelled)
3850        } else {
3851            Ok(WaitResult::Completed)
3852        }
3853    }
3854
3855    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3856    fn waitable_check(
3857        self,
3858        store: &mut StoreOpaque,
3859        cancellable: bool,
3860        check: WaitableCheck,
3861        params: WaitableCheckParams,
3862    ) -> Result<u32> {
3863        let guest_thread = store.current_guest_thread()?;
3864
3865        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3866
3867        let state = store.concurrent_state_mut()?;
3868        let task = state.get_mut(guest_thread.task)?;
3869
3870        // If we're waiting, and there are no events immediately available,
3871        // suspend the fiber until that changes.
3872        match &check {
3873            WaitableCheck::Wait => {
3874                let set = params.set;
3875
3876                if (task.event.is_none()
3877                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3878                    && state.get_mut(set)?.ready.is_empty()
3879                {
3880                    if cancellable {
3881                        let old = state
3882                            .get_mut(guest_thread.thread)?
3883                            .wake_on_cancel
3884                            .replace(set);
3885                        if !old.is_none() {
3886                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3887                        }
3888                    }
3889
3890                    store.suspend(SuspendReason::Waiting {
3891                        set,
3892                        thread: guest_thread,
3893                        skip_may_block_check: false,
3894                    })?;
3895                }
3896            }
3897            WaitableCheck::Poll => {}
3898        }
3899
3900        log::trace!(
3901            "waitable check for {guest_thread:?}; set {:?}, part two",
3902            params.set
3903        );
3904
3905        // Deliver any pending events to the guest and return.
3906        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3907
3908        let (ordinal, handle, result) = match &check {
3909            WaitableCheck::Wait => {
3910                let (event, waitable) = match event {
3911                    Some(p) => p,
3912                    None => bail_bug!("event expected to be present"),
3913                };
3914                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3915                let (ordinal, result) = event.parts();
3916                (ordinal, handle, result)
3917            }
3918            WaitableCheck::Poll => {
3919                if let Some((event, waitable)) = event {
3920                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3921                    let (ordinal, result) = event.parts();
3922                    (ordinal, handle, result)
3923                } else {
3924                    log::trace!(
3925                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3926                        guest_thread.task,
3927                        params.set
3928                    );
3929                    let (ordinal, result) = Event::None.parts();
3930                    (ordinal, 0, result)
3931                }
3932            }
3933        };
3934        let memory = self.options_memory_mut(store, params.options);
3935        let ptr = crate::component::func::validate_inbounds_dynamic(
3936            &CanonicalAbiInfo::POINTER_PAIR,
3937            memory,
3938            &ValRaw::u32(params.payload),
3939        )?;
3940        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3941        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3942        Ok(ordinal)
3943    }
3944
3945    /// Implements the `subtask.cancel` intrinsic.
3946    pub(crate) fn subtask_cancel(
3947        self,
3948        store: &mut StoreOpaque,
3949        caller_instance: RuntimeComponentInstanceIndex,
3950        async_: bool,
3951        task_id: u32,
3952    ) -> Result<u32> {
3953        if !async_ {
3954            // The caller may only sync call `subtask.cancel` from an async task
3955            // (i.e. a task created via a call to an async export).  Otherwise,
3956            // we'll trap.
3957            store.check_blocking()?;
3958        }
3959
3960        let (rep, is_host) = store
3961            .instance_state(self.runtime_instance(caller_instance))
3962            .handle_table()
3963            .subtask_rep(task_id)?;
3964        let waitable = if is_host {
3965            Waitable::Host(TableId::<HostTask>::new(rep))
3966        } else {
3967            Waitable::Guest(TableId::<GuestTask>::new(rep))
3968        };
3969        let concurrent_state = store.concurrent_state_mut()?;
3970
3971        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3972
3973        let needs_block;
3974        if let Waitable::Host(host_task) = waitable {
3975            let state = &mut concurrent_state.get_mut(host_task)?.state;
3976            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3977                // If the callee is still running, signal an abort is requested.
3978                //
3979                // After cancelling this falls through to block waiting for the
3980                // host task to actually finish assuming that `async_` is false.
3981                // This blocking behavior resolves the race of `handle.abort()`
3982                // with the task actually getting cancelled or finishing.
3983                HostTaskState::CalleeRunning(handle) => {
3984                    handle.abort();
3985                    needs_block = true;
3986                }
3987
3988                // Cancellation was already requested, so fail as the task can't
3989                // be cancelled twice.
3990                HostTaskState::CalleeDone { cancelled } => {
3991                    if cancelled {
3992                        bail!(Trap::SubtaskCancelAfterTerminal);
3993                    } else {
3994                        // The callee is already done so there's no need to
3995                        // block further for an event.
3996                        needs_block = false;
3997                    }
3998                }
3999
4000                // These states should not be possible for a subtask that's
4001                // visible from the guest, so trap here.
4002                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
4003                    bail_bug!("invalid states for host callee")
4004                }
4005            }
4006        } else {
4007            let guest_task = TableId::<GuestTask>::new(rep);
4008            let task = concurrent_state.get_mut(guest_task)?;
4009            if !task.already_lowered_parameters() {
4010                store.cancel_guest_subtask_without_lowered_parameters(
4011                    self.runtime_instance(caller_instance),
4012                    guest_task,
4013                )?;
4014                return Ok(Status::StartCancelled as u32);
4015            } else if !task.returned_or_cancelled() {
4016                // Started, but not yet returned or cancelled; send the
4017                // `CANCELLED` event
4018                task.cancel_sent = true;
4019                // Note that this might overwrite an event that was set earlier
4020                // (e.g. `Event::None` if the task is yielding, or
4021                // `Event::Cancelled` if it was already cancelled), but that's
4022                // okay -- this should supersede the previous state.
4023                task.event = Some(Event::Cancelled);
4024                let runtime_instance = task.instance.index;
4025                for thread in task.threads.clone() {
4026                    let thread = QualifiedThreadId {
4027                        task: guest_task,
4028                        thread,
4029                    };
4030                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
4031                    if let Some(set) = thread_mut.wake_on_cancel.take() {
4032                        // The thread is in a cancellable wait, so wake it up:
4033                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
4034                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
4035                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
4036                                runtime_instance,
4037                                GuestCall {
4038                                    thread,
4039                                    kind: GuestCallKind::DeliverEvent {
4040                                        instance,
4041                                        set: None,
4042                                    },
4043                                },
4044                            ),
4045                            None => bail_bug!("thread not present in wake_on_cancel set"),
4046                        };
4047                        concurrent_state.push_high_priority(item);
4048
4049                        let caller = store.current_guest_thread()?;
4050                        store.suspend(SuspendReason::Yielding {
4051                            thread: caller,
4052                            cancellable: false,
4053                            // `subtask.cancel` is not allowed to be called in a
4054                            // sync context, so we cannot skip the may-block check.
4055                            skip_may_block_check: false,
4056                        })?;
4057                        break;
4058                    } else if let GuestThreadState::Ready {
4059                        cancellable: true, ..
4060                    } = &thread_mut.state
4061                    {
4062                        // The thread is in a cancellable yield, so yield back
4063                        // to it.
4064                        concurrent_state.promote_thread_work_item(thread);
4065                        let caller = store.current_guest_thread()?;
4066                        store.suspend(SuspendReason::Yielding {
4067                            thread: caller,
4068                            cancellable: false,
4069                            skip_may_block_check: false,
4070                        })?;
4071                        break;
4072                    }
4073                }
4074
4075                // Guest tasks need to block if they have not yet returned or
4076                // cancelled, even as a result of the event delivery above.
4077                needs_block = !store
4078                    .concurrent_state_mut()?
4079                    .get_mut(guest_task)?
4080                    .returned_or_cancelled()
4081            } else {
4082                needs_block = false;
4083            }
4084        };
4085
4086        // If we need to block waiting on the terminal status of this subtask
4087        // then return immediately in `async` mode, or otherwise wait for the
4088        // event to get signaled through the store.
4089        if needs_block {
4090            if async_ {
4091                return Ok(BLOCKED);
4092            }
4093
4094            // Wait for this waitable to get signaled with its terminal status
4095            // from the completion callback enqueued by `first_poll`. Once
4096            // that's done fall through to the sahred
4097            store.wait_for_event(waitable)?;
4098
4099            // .. fall through to determine what event's in store for us.
4100        }
4101
4102        let event = waitable.take_event(store.concurrent_state_mut()?)?;
4103        if let Some(Event::Subtask {
4104            status: status @ (Status::Returned | Status::ReturnCancelled),
4105        }) = event
4106        {
4107            Ok(status as u32)
4108        } else {
4109            bail!(Trap::SubtaskCancelAfterTerminal);
4110        }
4111    }
4112}
4113
4114/// Trait representing component model ABI async intrinsics and fused adapter
4115/// helper functions.
4116///
4117/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
4118/// which must be valid for at least the duration of the call (and possibly for
4119/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
4120/// pointers used for async calls).
4121pub trait VMComponentAsyncStore {
4122    /// A helper function for fused adapter modules involving calls where the
4123    /// one of the caller or callee is async.
4124    ///
4125    /// This helper is not used when the caller and callee both use the sync
4126    /// ABI, only when at least one is async is this used.
4127    unsafe fn prepare_call(
4128        &mut self,
4129        instance: Instance,
4130        memory: *mut VMMemoryDefinition,
4131        start: NonNull<VMFuncRef>,
4132        return_: NonNull<VMFuncRef>,
4133        caller_instance: RuntimeComponentInstanceIndex,
4134        callee_instance: RuntimeComponentInstanceIndex,
4135        task_return_type: TypeTupleIndex,
4136        callee_async: bool,
4137        string_encoding: StringEncoding,
4138        result_count: u32,
4139        storage: *mut ValRaw,
4140        storage_len: usize,
4141    ) -> Result<()>;
4142
4143    /// A helper function for fused adapter modules involving calls where the
4144    /// caller is sync-lowered but the callee is async-lifted.
4145    unsafe fn sync_start(
4146        &mut self,
4147        instance: Instance,
4148        callback: *mut VMFuncRef,
4149        callee: NonNull<VMFuncRef>,
4150        param_count: u32,
4151        storage: *mut MaybeUninit<ValRaw>,
4152        storage_len: usize,
4153    ) -> Result<()>;
4154
4155    /// A helper function for fused adapter modules involving calls where the
4156    /// caller is async-lowered.
4157    unsafe fn async_start(
4158        &mut self,
4159        instance: Instance,
4160        callback: *mut VMFuncRef,
4161        post_return: *mut VMFuncRef,
4162        callee: NonNull<VMFuncRef>,
4163        param_count: u32,
4164        result_count: u32,
4165        flags: u32,
4166    ) -> Result<u32>;
4167
4168    /// The `future.write` intrinsic.
4169    fn future_write(
4170        &mut self,
4171        instance: Instance,
4172        caller: RuntimeComponentInstanceIndex,
4173        ty: TypeFutureTableIndex,
4174        options: OptionsIndex,
4175        future: u32,
4176        address: u32,
4177    ) -> Result<u32>;
4178
4179    /// The `future.read` intrinsic.
4180    fn future_read(
4181        &mut self,
4182        instance: Instance,
4183        caller: RuntimeComponentInstanceIndex,
4184        ty: TypeFutureTableIndex,
4185        options: OptionsIndex,
4186        future: u32,
4187        address: u32,
4188    ) -> Result<u32>;
4189
4190    /// The `future.drop-writable` intrinsic.
4191    fn future_drop_writable(
4192        &mut self,
4193        instance: Instance,
4194        ty: TypeFutureTableIndex,
4195        writer: u32,
4196    ) -> Result<()>;
4197
4198    /// The `stream.write` intrinsic.
4199    fn stream_write(
4200        &mut self,
4201        instance: Instance,
4202        caller: RuntimeComponentInstanceIndex,
4203        ty: TypeStreamTableIndex,
4204        options: OptionsIndex,
4205        stream: u32,
4206        address: u32,
4207        count: u32,
4208    ) -> Result<u32>;
4209
4210    /// The `stream.read` intrinsic.
4211    fn stream_read(
4212        &mut self,
4213        instance: Instance,
4214        caller: RuntimeComponentInstanceIndex,
4215        ty: TypeStreamTableIndex,
4216        options: OptionsIndex,
4217        stream: u32,
4218        address: u32,
4219        count: u32,
4220    ) -> Result<u32>;
4221
4222    /// The "fast-path" implementation of the `stream.write` intrinsic for
4223    /// "flat" (i.e. memcpy-able) payloads.
4224    fn flat_stream_write(
4225        &mut self,
4226        instance: Instance,
4227        caller: RuntimeComponentInstanceIndex,
4228        ty: TypeStreamTableIndex,
4229        options: OptionsIndex,
4230        payload_size: u32,
4231        payload_align: u32,
4232        stream: u32,
4233        address: u32,
4234        count: u32,
4235    ) -> Result<u32>;
4236
4237    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4238    /// (i.e. memcpy-able) payloads.
4239    fn flat_stream_read(
4240        &mut self,
4241        instance: Instance,
4242        caller: RuntimeComponentInstanceIndex,
4243        ty: TypeStreamTableIndex,
4244        options: OptionsIndex,
4245        payload_size: u32,
4246        payload_align: u32,
4247        stream: u32,
4248        address: u32,
4249        count: u32,
4250    ) -> Result<u32>;
4251
4252    /// The `stream.drop-writable` intrinsic.
4253    fn stream_drop_writable(
4254        &mut self,
4255        instance: Instance,
4256        ty: TypeStreamTableIndex,
4257        writer: u32,
4258    ) -> Result<()>;
4259
4260    /// The `error-context.debug-message` intrinsic.
4261    fn error_context_debug_message(
4262        &mut self,
4263        instance: Instance,
4264        ty: TypeComponentLocalErrorContextTableIndex,
4265        options: OptionsIndex,
4266        err_ctx_handle: u32,
4267        debug_msg_address: u32,
4268    ) -> Result<()>;
4269
4270    /// The `thread.new-indirect` intrinsic
4271    fn thread_new_indirect(
4272        &mut self,
4273        instance: Instance,
4274        caller: RuntimeComponentInstanceIndex,
4275        func_ty_idx: TypeFuncIndex,
4276        start_func_table_idx: RuntimeTableIndex,
4277        start_func_idx: u32,
4278        context: i32,
4279    ) -> Result<u32>;
4280}
4281
4282/// SAFETY: See trait docs.
4283impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4284    unsafe fn prepare_call(
4285        &mut self,
4286        instance: Instance,
4287        memory: *mut VMMemoryDefinition,
4288        start: NonNull<VMFuncRef>,
4289        return_: NonNull<VMFuncRef>,
4290        caller_instance: RuntimeComponentInstanceIndex,
4291        callee_instance: RuntimeComponentInstanceIndex,
4292        task_return_type: TypeTupleIndex,
4293        callee_async: bool,
4294        string_encoding: StringEncoding,
4295        result_count_or_max_if_async: u32,
4296        storage: *mut ValRaw,
4297        storage_len: usize,
4298    ) -> Result<()> {
4299        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4300        // this method will have ensured that `storage` is a valid
4301        // pointer containing at least `storage_len` items.
4302        let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4303
4304        unsafe {
4305            instance.prepare_call(
4306                StoreContextMut(self),
4307                start,
4308                return_,
4309                caller_instance,
4310                callee_instance,
4311                task_return_type,
4312                callee_async,
4313                memory,
4314                string_encoding,
4315                match result_count_or_max_if_async {
4316                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4317                        params,
4318                        has_result: false,
4319                    },
4320                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4321                        params,
4322                        has_result: true,
4323                    },
4324                    result_count => CallerInfo::Sync {
4325                        params,
4326                        result_count,
4327                    },
4328                },
4329            )
4330        }
4331    }
4332
4333    unsafe fn sync_start(
4334        &mut self,
4335        instance: Instance,
4336        callback: *mut VMFuncRef,
4337        callee: NonNull<VMFuncRef>,
4338        param_count: u32,
4339        storage: *mut MaybeUninit<ValRaw>,
4340        storage_len: usize,
4341    ) -> Result<()> {
4342        unsafe {
4343            instance
4344                .start_call(
4345                    StoreContextMut(self),
4346                    callback,
4347                    ptr::null_mut(),
4348                    callee,
4349                    param_count,
4350                    1,
4351                    START_FLAG_ASYNC_CALLEE,
4352                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4353                    // this method will have ensured that `storage` is a valid
4354                    // pointer containing at least `storage_len` items.
4355                    Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4356                )
4357                .map(drop)
4358        }
4359    }
4360
4361    unsafe fn async_start(
4362        &mut self,
4363        instance: Instance,
4364        callback: *mut VMFuncRef,
4365        post_return: *mut VMFuncRef,
4366        callee: NonNull<VMFuncRef>,
4367        param_count: u32,
4368        result_count: u32,
4369        flags: u32,
4370    ) -> Result<u32> {
4371        unsafe {
4372            instance.start_call(
4373                StoreContextMut(self),
4374                callback,
4375                post_return,
4376                callee,
4377                param_count,
4378                result_count,
4379                flags,
4380                None,
4381            )
4382        }
4383    }
4384
4385    fn future_write(
4386        &mut self,
4387        instance: Instance,
4388        caller: RuntimeComponentInstanceIndex,
4389        ty: TypeFutureTableIndex,
4390        options: OptionsIndex,
4391        future: u32,
4392        address: u32,
4393    ) -> Result<u32> {
4394        instance
4395            .guest_write(
4396                StoreContextMut(self),
4397                caller,
4398                TransmitIndex::Future(ty),
4399                options,
4400                None,
4401                future,
4402                address,
4403                1,
4404            )
4405            .map(|result| result.encode())
4406    }
4407
4408    fn future_read(
4409        &mut self,
4410        instance: Instance,
4411        caller: RuntimeComponentInstanceIndex,
4412        ty: TypeFutureTableIndex,
4413        options: OptionsIndex,
4414        future: u32,
4415        address: u32,
4416    ) -> Result<u32> {
4417        instance
4418            .guest_read(
4419                StoreContextMut(self),
4420                caller,
4421                TransmitIndex::Future(ty),
4422                options,
4423                None,
4424                future,
4425                address,
4426                1,
4427            )
4428            .map(|result| result.encode())
4429    }
4430
4431    fn stream_write(
4432        &mut self,
4433        instance: Instance,
4434        caller: RuntimeComponentInstanceIndex,
4435        ty: TypeStreamTableIndex,
4436        options: OptionsIndex,
4437        stream: u32,
4438        address: u32,
4439        count: u32,
4440    ) -> Result<u32> {
4441        instance
4442            .guest_write(
4443                StoreContextMut(self),
4444                caller,
4445                TransmitIndex::Stream(ty),
4446                options,
4447                None,
4448                stream,
4449                address,
4450                count,
4451            )
4452            .map(|result| result.encode())
4453    }
4454
4455    fn stream_read(
4456        &mut self,
4457        instance: Instance,
4458        caller: RuntimeComponentInstanceIndex,
4459        ty: TypeStreamTableIndex,
4460        options: OptionsIndex,
4461        stream: u32,
4462        address: u32,
4463        count: u32,
4464    ) -> Result<u32> {
4465        instance
4466            .guest_read(
4467                StoreContextMut(self),
4468                caller,
4469                TransmitIndex::Stream(ty),
4470                options,
4471                None,
4472                stream,
4473                address,
4474                count,
4475            )
4476            .map(|result| result.encode())
4477    }
4478
4479    fn future_drop_writable(
4480        &mut self,
4481        instance: Instance,
4482        ty: TypeFutureTableIndex,
4483        writer: u32,
4484    ) -> Result<()> {
4485        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4486    }
4487
4488    fn flat_stream_write(
4489        &mut self,
4490        instance: Instance,
4491        caller: RuntimeComponentInstanceIndex,
4492        ty: TypeStreamTableIndex,
4493        options: OptionsIndex,
4494        payload_size: u32,
4495        payload_align: u32,
4496        stream: u32,
4497        address: u32,
4498        count: u32,
4499    ) -> Result<u32> {
4500        instance
4501            .guest_write(
4502                StoreContextMut(self),
4503                caller,
4504                TransmitIndex::Stream(ty),
4505                options,
4506                Some(FlatAbi {
4507                    size: payload_size,
4508                    align: payload_align,
4509                }),
4510                stream,
4511                address,
4512                count,
4513            )
4514            .map(|result| result.encode())
4515    }
4516
4517    fn flat_stream_read(
4518        &mut self,
4519        instance: Instance,
4520        caller: RuntimeComponentInstanceIndex,
4521        ty: TypeStreamTableIndex,
4522        options: OptionsIndex,
4523        payload_size: u32,
4524        payload_align: u32,
4525        stream: u32,
4526        address: u32,
4527        count: u32,
4528    ) -> Result<u32> {
4529        instance
4530            .guest_read(
4531                StoreContextMut(self),
4532                caller,
4533                TransmitIndex::Stream(ty),
4534                options,
4535                Some(FlatAbi {
4536                    size: payload_size,
4537                    align: payload_align,
4538                }),
4539                stream,
4540                address,
4541                count,
4542            )
4543            .map(|result| result.encode())
4544    }
4545
4546    fn stream_drop_writable(
4547        &mut self,
4548        instance: Instance,
4549        ty: TypeStreamTableIndex,
4550        writer: u32,
4551    ) -> Result<()> {
4552        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4553    }
4554
4555    fn error_context_debug_message(
4556        &mut self,
4557        instance: Instance,
4558        ty: TypeComponentLocalErrorContextTableIndex,
4559        options: OptionsIndex,
4560        err_ctx_handle: u32,
4561        debug_msg_address: u32,
4562    ) -> Result<()> {
4563        instance.error_context_debug_message(
4564            StoreContextMut(self),
4565            ty,
4566            options,
4567            err_ctx_handle,
4568            debug_msg_address,
4569        )
4570    }
4571
4572    fn thread_new_indirect(
4573        &mut self,
4574        instance: Instance,
4575        caller: RuntimeComponentInstanceIndex,
4576        func_ty_idx: TypeFuncIndex,
4577        start_func_table_idx: RuntimeTableIndex,
4578        start_func_idx: u32,
4579        context: i32,
4580    ) -> Result<u32> {
4581        instance.thread_new_indirect(
4582            StoreContextMut(self),
4583            caller,
4584            func_ty_idx,
4585            start_func_table_idx,
4586            start_func_idx,
4587            context,
4588        )
4589    }
4590}
4591
4592type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4593
4594/// Represents the state of a pending host task.
4595///
4596/// This is used to represent tasks when the guest calls into the host.
4597pub(crate) struct HostTask {
4598    common: WaitableCommon,
4599
4600    /// Guest thread which called the host.
4601    caller: QualifiedThreadId,
4602
4603    /// State of borrows/etc the host needs to track. Used when the guest passes
4604    /// borrows to the host, for example.
4605    call_context: CallContext,
4606
4607    state: HostTaskState,
4608}
4609
4610enum HostTaskState {
4611    /// A host task has been created and it's considered "started".
4612    ///
4613    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4614    /// is where this will get updated further.
4615    CalleeStarted,
4616
4617    /// State used for tasks in `first_poll` meaning that the guest did an async
4618    /// lower of a host async function which is blocked. The specified handle is
4619    /// linked to the future in the main `FuturesUnordered` of a store which is
4620    /// used to cancel it if the guest requests cancellation.
4621    CalleeRunning(JoinHandle),
4622
4623    /// Terminal state used for tasks in `poll_and_block` to store the result of
4624    /// their computation. Note that this state is not used for tasks in
4625    /// `first_poll`.
4626    CalleeFinished(LiftedResult),
4627
4628    /// Terminal state for host tasks meaning that the task was cancelled or the
4629    /// result was taken.
4630    CalleeDone { cancelled: bool },
4631}
4632
4633impl HostTask {
4634    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4635        Self {
4636            common: WaitableCommon::default(),
4637            call_context: CallContext::default(),
4638            caller,
4639            state,
4640        }
4641    }
4642}
4643
4644impl TableDebug for HostTask {
4645    fn type_name() -> &'static str {
4646        "HostTask"
4647    }
4648}
4649
4650type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4651
4652/// Represents the caller of a given guest task.
4653enum Caller {
4654    /// The host called the guest task.
4655    Host {
4656        /// If present, may be used to deliver the result.
4657        tx: Option<oneshot::Sender<LiftedResult>>,
4658        /// If true, there's a host future that must be dropped before the task
4659        /// can be deleted.
4660        host_future_present: bool,
4661        /// Represents the caller of the host function which called back into a
4662        /// guest. Note that this thread could belong to an entirely unrelated
4663        /// top-level component instance than the one the host called into.
4664        caller: CurrentThread,
4665    },
4666    /// Another guest thread called the guest task
4667    Guest {
4668        /// The id of the caller
4669        thread: QualifiedThreadId,
4670    },
4671}
4672
4673/// Represents a closure and related canonical ABI parameters required to
4674/// validate a `task.return` call at runtime and lift the result.
4675struct LiftResult {
4676    lift: RawLift,
4677    ty: TypeTupleIndex,
4678    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4679    string_encoding: StringEncoding,
4680}
4681
4682/// The table ID for a guest thread, qualified by the task to which it belongs.
4683///
4684/// This exists to minimize table lookups and the necessity to pass stores around mutably
4685/// for the common case of identifying the task to which a thread belongs.
4686#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4687pub(crate) struct QualifiedThreadId {
4688    task: TableId<GuestTask>,
4689    thread: TableId<GuestThread>,
4690}
4691
4692impl QualifiedThreadId {
4693    fn qualify(
4694        state: &mut ConcurrentState,
4695        thread: TableId<GuestThread>,
4696    ) -> Result<QualifiedThreadId> {
4697        Ok(QualifiedThreadId {
4698            task: state.get_mut(thread)?.parent_task,
4699            thread,
4700        })
4701    }
4702}
4703
4704impl fmt::Debug for QualifiedThreadId {
4705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4706        f.debug_tuple("QualifiedThreadId")
4707            .field(&self.task.rep())
4708            .field(&self.thread.rep())
4709            .finish()
4710    }
4711}
4712
4713enum GuestThreadState {
4714    NotStartedImplicit,
4715    NotStartedExplicit(
4716        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4717    ),
4718    Running,
4719    Suspended(StoreFiber<'static>),
4720    Ready {
4721        fiber: StoreFiber<'static>,
4722        cancellable: bool,
4723    },
4724    Completed,
4725}
4726pub struct GuestThread {
4727    /// Context-local state used to implement the `context.{get,set}`
4728    /// intrinsics.
4729    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4730    /// The owning guest task.
4731    parent_task: TableId<GuestTask>,
4732    /// If present, indicates that the thread is currently waiting on the
4733    /// specified set but may be cancelled and woken immediately.
4734    wake_on_cancel: Option<TableId<WaitableSet>>,
4735    /// The execution state of this guest thread
4736    state: GuestThreadState,
4737    /// The index of this thread in the component instance's handle table.
4738    /// This must always be `Some` after initialization.
4739    instance_rep: Option<u32>,
4740    /// Scratch waitable set used to watch subtasks during synchronous calls.
4741    sync_call_set: TableId<WaitableSet>,
4742}
4743
4744impl GuestThread {
4745    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4746    /// handle.
4747    fn from_instance(
4748        state: Pin<&mut ComponentInstance>,
4749        caller_instance: RuntimeComponentInstanceIndex,
4750        guest_thread: u32,
4751    ) -> Result<TableId<Self>> {
4752        let rep = state.instance_states().0[caller_instance]
4753            .thread_handle_table()
4754            .guest_thread_rep(guest_thread)?;
4755        Ok(TableId::new(rep))
4756    }
4757
4758    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4759        let sync_call_set = state.push(WaitableSet {
4760            is_sync_call_set: true,
4761            ..WaitableSet::default()
4762        })?;
4763        Ok(Self {
4764            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4765            parent_task,
4766            wake_on_cancel: None,
4767            state: GuestThreadState::NotStartedImplicit,
4768            instance_rep: None,
4769            sync_call_set,
4770        })
4771    }
4772
4773    fn new_explicit(
4774        state: &mut ConcurrentState,
4775        parent_task: TableId<GuestTask>,
4776        start_func: Box<
4777            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4778        >,
4779    ) -> Result<Self> {
4780        let sync_call_set = state.push(WaitableSet {
4781            is_sync_call_set: true,
4782            ..WaitableSet::default()
4783        })?;
4784        Ok(Self {
4785            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4786            parent_task,
4787            wake_on_cancel: None,
4788            state: GuestThreadState::NotStartedExplicit(start_func),
4789            instance_rep: None,
4790            sync_call_set,
4791        })
4792    }
4793}
4794
4795impl TableDebug for GuestThread {
4796    fn type_name() -> &'static str {
4797        "GuestThread"
4798    }
4799}
4800
4801enum SyncResult {
4802    NotProduced,
4803    Produced(Option<ValRaw>),
4804    Taken,
4805}
4806
4807impl SyncResult {
4808    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4809        Ok(match mem::replace(self, SyncResult::Taken) {
4810            SyncResult::NotProduced => None,
4811            SyncResult::Produced(val) => Some(val),
4812            SyncResult::Taken => {
4813                bail_bug!("attempted to take a synchronous result that was already taken")
4814            }
4815        })
4816    }
4817}
4818
4819#[derive(Debug)]
4820enum HostFutureState {
4821    NotApplicable,
4822    Live,
4823    Dropped,
4824}
4825
4826/// Represents a pending guest task.
4827pub(crate) struct GuestTask {
4828    /// See `WaitableCommon`
4829    common: WaitableCommon,
4830    /// Closure to lower the parameters passed to this task.
4831    lower_params: Option<RawLower>,
4832    /// See `LiftResult`
4833    lift_result: Option<LiftResult>,
4834    /// A place to stash the type-erased lifted result if it can't be delivered
4835    /// immediately.
4836    result: Option<LiftedResult>,
4837    /// Closure to call the callback function for an async-lifted export, if
4838    /// provided.
4839    callback: Option<CallbackFn>,
4840    /// See `Caller`
4841    caller: Caller,
4842    /// Borrow state for this task.
4843    ///
4844    /// Keeps track of `borrow<T>` received to this task to ensure that
4845    /// everything is dropped by the time it exits.
4846    call_context: CallContext,
4847    /// A place to stash the lowered result for a sync-to-async call until it
4848    /// can be returned to the caller.
4849    sync_result: SyncResult,
4850    /// Whether or not the task has been cancelled (i.e. whether the task is
4851    /// permitted to call `task.cancel`).
4852    cancel_sent: bool,
4853    /// Whether or not we've sent a `Status::Starting` event to any current or
4854    /// future waiters for this waitable.
4855    starting_sent: bool,
4856    /// The runtime instance to which the exported function for this guest task
4857    /// belongs.
4858    ///
4859    /// Note that the task may do a sync->sync call via a fused adapter which
4860    /// results in that task executing code in a different instance, and it may
4861    /// call host functions and intrinsics from that other instance.
4862    instance: RuntimeInstance,
4863    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4864    /// delivered to this task.
4865    event: Option<Event>,
4866    /// Whether or not the task has exited.
4867    exited: bool,
4868    /// Threads belonging to this task
4869    threads: HashSet<TableId<GuestThread>>,
4870    /// The state of the host future that represents an async task, which must
4871    /// be dropped before we can delete the task.
4872    host_future_state: HostFutureState,
4873    /// Indicates whether this task was created for a call to an async-lifted
4874    /// export.
4875    async_function: bool,
4876
4877    decremented_interesting_task_count: bool,
4878}
4879
4880impl GuestTask {
4881    fn already_lowered_parameters(&self) -> bool {
4882        // We reset `lower_params` after we lower the parameters
4883        self.lower_params.is_none()
4884    }
4885
4886    fn returned_or_cancelled(&self) -> bool {
4887        // We reset `lift_result` after we return or exit
4888        self.lift_result.is_none()
4889    }
4890
4891    fn ready_to_delete(&self) -> bool {
4892        let threads_completed = self.threads.is_empty();
4893        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4894        let pending_completion_event = matches!(
4895            self.common.event,
4896            Some(Event::Subtask {
4897                status: Status::Returned | Status::ReturnCancelled
4898            })
4899        );
4900        let ready = threads_completed
4901            && !has_sync_result
4902            && !pending_completion_event
4903            && !matches!(self.host_future_state, HostFutureState::Live);
4904        log::trace!(
4905            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4906            threads_completed,
4907            has_sync_result,
4908            pending_completion_event,
4909            self.host_future_state
4910        );
4911        ready
4912    }
4913
4914    fn new(
4915        state: &mut ConcurrentState,
4916        lower_params: RawLower,
4917        lift_result: LiftResult,
4918        caller: Caller,
4919        callback: Option<CallbackFn>,
4920        instance: RuntimeInstance,
4921        async_function: bool,
4922    ) -> Result<QualifiedThreadId> {
4923        let host_future_state = match &caller {
4924            Caller::Guest { .. } => HostFutureState::NotApplicable,
4925            Caller::Host {
4926                host_future_present,
4927                ..
4928            } => {
4929                if *host_future_present {
4930                    HostFutureState::Live
4931                } else {
4932                    HostFutureState::NotApplicable
4933                }
4934            }
4935        };
4936        let task = state.push(Self {
4937            common: WaitableCommon::default(),
4938            lower_params: Some(lower_params),
4939            lift_result: Some(lift_result),
4940            result: None,
4941            callback,
4942            caller,
4943            call_context: CallContext::default(),
4944            sync_result: SyncResult::NotProduced,
4945            cancel_sent: false,
4946            starting_sent: false,
4947            instance,
4948            event: None,
4949            exited: false,
4950            threads: HashSet::new(),
4951            host_future_state,
4952            async_function,
4953            decremented_interesting_task_count: false,
4954        })?;
4955        let new_thread = GuestThread::new_implicit(state, task)?;
4956        let thread = state.push(new_thread)?;
4957        state.get_mut(task)?.threads.insert(thread);
4958        state.interesting_tasks += 1;
4959        Ok(QualifiedThreadId { task, thread })
4960    }
4961}
4962
4963impl TableDebug for GuestTask {
4964    fn type_name() -> &'static str {
4965        "GuestTask"
4966    }
4967}
4968
4969/// Represents state common to all kinds of waitables.
4970#[derive(Default)]
4971struct WaitableCommon {
4972    /// The currently pending event for this waitable, if any.
4973    event: Option<Event>,
4974    /// The set to which this waitable belongs, if any.
4975    set: Option<TableId<WaitableSet>>,
4976    /// The handle with which the guest refers to this waitable, if any.
4977    handle: Option<u32>,
4978}
4979
4980/// Represents a Component Model Async `waitable`.
4981#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4982enum Waitable {
4983    /// A host task
4984    Host(TableId<HostTask>),
4985    /// A guest task
4986    Guest(TableId<GuestTask>),
4987    /// The read or write end of a stream or future
4988    Transmit(TableId<TransmitHandle>),
4989}
4990
4991impl Waitable {
4992    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4993    /// handle.
4994    fn from_instance(
4995        state: Pin<&mut ComponentInstance>,
4996        caller_instance: RuntimeComponentInstanceIndex,
4997        waitable: u32,
4998    ) -> Result<Self> {
4999        use crate::runtime::vm::component::Waitable;
5000
5001        let (waitable, kind) = state.instance_states().0[caller_instance]
5002            .handle_table()
5003            .waitable_rep(waitable)?;
5004
5005        Ok(match kind {
5006            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
5007            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
5008            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
5009        })
5010    }
5011
5012    /// Retrieve the host-visible identifier for this `Waitable`.
5013    fn rep(&self) -> u32 {
5014        match self {
5015            Self::Host(id) => id.rep(),
5016            Self::Guest(id) => id.rep(),
5017            Self::Transmit(id) => id.rep(),
5018        }
5019    }
5020
5021    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
5022    /// remove it from any set it may currently belong to (when `set` is
5023    /// `None`).
5024    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
5025        log::trace!("waitable {self:?} join set {set:?}");
5026
5027        let old = mem::replace(&mut self.common(state)?.set, set);
5028
5029        if let Some(old) = old {
5030            match *self {
5031                Waitable::Host(id) => state.remove_child(id, old),
5032                Waitable::Guest(id) => state.remove_child(id, old),
5033                Waitable::Transmit(id) => state.remove_child(id, old),
5034            }?;
5035
5036            state.get_mut(old)?.ready.remove(self);
5037        }
5038
5039        if let Some(set) = set {
5040            match *self {
5041                Waitable::Host(id) => state.add_child(id, set),
5042                Waitable::Guest(id) => state.add_child(id, set),
5043                Waitable::Transmit(id) => state.add_child(id, set),
5044            }?;
5045
5046            if self.common(state)?.event.is_some() {
5047                self.mark_ready(state)?;
5048            }
5049        }
5050
5051        Ok(())
5052    }
5053
5054    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
5055    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
5056        Ok(match self {
5057            Self::Host(id) => &mut state.get_mut(*id)?.common,
5058            Self::Guest(id) => &mut state.get_mut(*id)?.common,
5059            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
5060        })
5061    }
5062
5063    /// Set or clear the pending event for this waitable and either deliver it
5064    /// to the first waiter, if any, or mark it as ready to be delivered to the
5065    /// next waiter that arrives.
5066    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
5067        log::trace!("set event for {self:?}: {event:?}");
5068        self.common(state)?.event = event;
5069        self.mark_ready(state)
5070    }
5071
5072    /// Take the pending event from this waitable, leaving `None` in its place.
5073    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
5074        let common = self.common(state)?;
5075        let event = common.event.take();
5076        if let Some(set) = self.common(state)?.set {
5077            state.get_mut(set)?.ready.remove(self);
5078        }
5079
5080        Ok(event)
5081    }
5082
5083    /// Deliver the current event for this waitable to the first waiter, if any,
5084    /// or else mark it as ready to be delivered to the next waiter that
5085    /// arrives.
5086    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
5087        if let Some(set) = self.common(state)?.set {
5088            state.get_mut(set)?.ready.insert(*self);
5089            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
5090                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
5091                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
5092
5093                let item = match mode {
5094                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
5095                    WaitMode::Callback(instance) => WorkItem::GuestCall(
5096                        state.get_mut(thread.task)?.instance.index,
5097                        GuestCall {
5098                            thread,
5099                            kind: GuestCallKind::DeliverEvent {
5100                                instance,
5101                                set: Some(set),
5102                            },
5103                        },
5104                    ),
5105                };
5106                state.push_high_priority(item);
5107            }
5108        }
5109        Ok(())
5110    }
5111
5112    /// Remove this waitable from the instance's rep table.
5113    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
5114        match self {
5115            Self::Host(task) => {
5116                log::trace!("delete host task {task:?}");
5117                state.delete(*task)?;
5118            }
5119            Self::Guest(task) => {
5120                log::trace!("delete guest task {task:?}");
5121                let task = state.delete(*task)?;
5122
5123                // When a guest task is created it increments the
5124                // `ConcurrentState::interesting_tasks` counter, and that needs
5125                // to be paired with a decrement. There are a few situations in
5126                // which the decrement needs to happen which don't all funnel
5127                // through here, so in lieu of that at least try to catch issues
5128                // where we forgot to do a decrement.
5129                debug_assert!(task.decremented_interesting_task_count);
5130            }
5131            Self::Transmit(task) => {
5132                state.delete(*task)?;
5133            }
5134        }
5135
5136        Ok(())
5137    }
5138}
5139
5140impl fmt::Debug for Waitable {
5141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5142        match self {
5143            Self::Host(id) => write!(f, "{id:?}"),
5144            Self::Guest(id) => write!(f, "{id:?}"),
5145            Self::Transmit(id) => write!(f, "{id:?}"),
5146        }
5147    }
5148}
5149
5150/// Represents a Component Model Async `waitable-set`.
5151#[derive(Default)]
5152struct WaitableSet {
5153    /// Which waitables in this set have pending events, if any.
5154    ready: BTreeSet<Waitable>,
5155    /// Which guest threads are currently waiting on this set, if any.
5156    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
5157    /// Whether this set is a synthetic, internal one meant for handling
5158    /// synchronous calls.
5159    is_sync_call_set: bool,
5160}
5161
5162impl TableDebug for WaitableSet {
5163    fn type_name() -> &'static str {
5164        "WaitableSet"
5165    }
5166}
5167
5168/// Type-erased closure to lower the parameters for a guest task.
5169type RawLower =
5170    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
5171
5172/// Type-erased closure to lift the result for a guest task.
5173type RawLift = Box<
5174    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5175>;
5176
5177/// Type erased result of a guest task which may be downcast to the expected
5178/// type by a host caller (or simply ignored in the case of a guest caller; see
5179/// `DummyResult`).
5180type LiftedResult = Box<dyn Any + Send + Sync>;
5181
5182/// Used to return a result from a `LiftFn` when the actual result has already
5183/// been lowered to a guest task's stack and linear memory.
5184struct DummyResult;
5185
5186/// Represents the Component Model Async state of a (sub-)component instance.
5187#[derive(Default)]
5188pub struct ConcurrentInstanceState {
5189    /// Whether backpressure is set for this instance (enabled if >0)
5190    backpressure: u16,
5191    /// Whether this instance can be entered
5192    do_not_enter: bool,
5193    /// Pending calls for this instance which require `Self::backpressure` to be
5194    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
5195    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
5196}
5197
5198impl ConcurrentInstanceState {
5199    pub fn pending_is_empty(&self) -> bool {
5200        self.pending.is_empty()
5201    }
5202}
5203
5204#[derive(Debug, Copy, Clone)]
5205pub(crate) enum CurrentThread {
5206    Guest(QualifiedThreadId),
5207    Host(TableId<HostTask>),
5208    None,
5209}
5210
5211impl CurrentThread {
5212    fn guest(&self) -> Option<&QualifiedThreadId> {
5213        match self {
5214            Self::Guest(id) => Some(id),
5215            _ => None,
5216        }
5217    }
5218
5219    fn host(&self) -> Option<TableId<HostTask>> {
5220        match self {
5221            Self::Host(id) => Some(*id),
5222            _ => None,
5223        }
5224    }
5225
5226    fn is_none(&self) -> bool {
5227        matches!(self, Self::None)
5228    }
5229}
5230
5231impl From<QualifiedThreadId> for CurrentThread {
5232    fn from(id: QualifiedThreadId) -> Self {
5233        Self::Guest(id)
5234    }
5235}
5236
5237impl From<TableId<HostTask>> for CurrentThread {
5238    fn from(id: TableId<HostTask>) -> Self {
5239        Self::Host(id)
5240    }
5241}
5242
5243/// Represents the Component Model Async state of a store.
5244pub struct ConcurrentState {
5245    /// The currently running thread, if any.
5246    ///
5247    /// Note that we lazily materialize threads on-demand and this field is not
5248    /// necessarily up-to-date. The `StoreOpaque::current_thread` method should
5249    /// be preferred over directly accessing this field.
5250    unforced_current_thread: CurrentThread,
5251
5252    /// The set of pending host and background tasks, if any.
5253    ///
5254    /// See `ComponentInstance::poll_until` for where we temporarily take this
5255    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5256    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5257    /// The table of waitables, waitable sets, etc.
5258    table: AlwaysMut<ResourceTable>,
5259    /// The "high priority" work queue for this store's event loop.
5260    high_priority: Vec<WorkItem>,
5261    /// The "low priority" work queue for this store's event loop.
5262    low_priority: VecDeque<WorkItem>,
5263    /// A place to stash the reason a fiber is suspending so that the code which
5264    /// resumed it will know under what conditions the fiber should be resumed
5265    /// again.
5266    suspend_reason: Option<SuspendReason>,
5267    /// A cached fiber which is waiting for work to do.
5268    ///
5269    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5270    worker: Option<StoreFiber<'static>>,
5271    /// A place to stash the work item for which we're resuming a worker fiber.
5272    worker_item: Option<WorkerItem>,
5273
5274    /// Reference counts for all component error contexts
5275    ///
5276    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5277    /// (sub)component ref counts as tracked by `error_context_tables`, for
5278    /// example when the host holds one or more references to error contexts.
5279    ///
5280    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5281    /// component-wide representation) of the index into concurrent state for a given
5282    /// stored `ErrorContext`.
5283    ///
5284    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5285    /// as a `TableId<ErrorContextState>`.
5286    global_error_context_ref_counts:
5287        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5288
5289    /// The number of "interesting tasks" currently executing in the store.
5290    ///
5291    /// This tracks the concept of a component instance lifetime as defined in
5292    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5293    /// all tasks currently increment this counter which then gets decremented
5294    /// when they exit. In the future some tasks might not increment this
5295    /// counter, but for now all do.
5296    ///
5297    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5298    /// inform the embedder when all tasks have completed. This is then
5299    /// used in wasmtime-wasi-http, for example, to know when an instance is
5300    /// idle.
5301    interesting_tasks: usize,
5302
5303    /// Single waker to notify when `interesting_tasks` reaches 0.
5304    ///
5305    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5306    interesting_tasks_empty_waker: Option<Waker>,
5307
5308    /// Single waker to notify when a component instance goes from
5309    /// not-concurrently-callable to concurrently-callable.
5310    ///
5311    /// Used in the implementation of `Accessor::poll_ready_for_concurrent_call`.
5312    ready_for_concurrent_call_waker: Option<Waker>,
5313}
5314
5315impl Default for ConcurrentState {
5316    fn default() -> Self {
5317        Self {
5318            unforced_current_thread: CurrentThread::None,
5319            table: AlwaysMut::new(ResourceTable::new()),
5320            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5321            high_priority: Vec::new(),
5322            low_priority: VecDeque::new(),
5323            suspend_reason: None,
5324            worker: None,
5325            worker_item: None,
5326            global_error_context_ref_counts: BTreeMap::new(),
5327            interesting_tasks: 0,
5328            interesting_tasks_empty_waker: None,
5329            ready_for_concurrent_call_waker: None,
5330        }
5331    }
5332}
5333
5334impl ConcurrentState {
5335    /// Take ownership of any fibers and futures owned by this object.
5336    ///
5337    /// This should be used when disposing of the `Store` containing this object
5338    /// in order to gracefully resolve any and all fibers using
5339    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5340    /// use-after-free bugs due to fibers which may still have access to the
5341    /// `Store`.
5342    ///
5343    /// Additionally, the futures collected with this function should be dropped
5344    /// within a `tls::set` call, which will ensure than any futures closing
5345    /// over an `&Accessor` will have access to the store when dropped, allowing
5346    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5347    /// panicking.
5348    ///
5349    /// Note that this will leave the object in an inconsistent and unusable
5350    /// state, so it should only be used just prior to dropping it.
5351    pub(crate) fn take_fibers_and_futures(
5352        &mut self,
5353        fibers: &mut Vec<StoreFiber<'static>>,
5354        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5355    ) {
5356        for entry in self.table.get_mut().iter_mut() {
5357            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5358                for mode in mem::take(&mut set.waiting).into_values() {
5359                    if let WaitMode::Fiber(fiber) = mode {
5360                        fibers.push(fiber);
5361                    }
5362                }
5363            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5364                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5365                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5366                {
5367                    fibers.push(fiber);
5368                }
5369            }
5370        }
5371
5372        if let Some(fiber) = self.worker.take() {
5373            fibers.push(fiber);
5374        }
5375
5376        let mut handle_item = |item| match item {
5377            WorkItem::ResumeFiber(fiber) => {
5378                fibers.push(fiber);
5379            }
5380            WorkItem::PushFuture(future) => {
5381                self.futures
5382                    .get_mut()
5383                    .as_mut()
5384                    .unwrap()
5385                    .push(future.into_inner());
5386            }
5387            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5388            }
5389        };
5390
5391        for item in mem::take(&mut self.high_priority) {
5392            handle_item(item);
5393        }
5394        for item in mem::take(&mut self.low_priority) {
5395            handle_item(item);
5396        }
5397
5398        if let Some(them) = self.futures.get_mut().take() {
5399            futures.push(them);
5400        }
5401    }
5402
5403    #[cfg(feature = "gc")]
5404    pub(crate) fn trace_fiber_roots(
5405        &mut self,
5406        modules: &ModuleRegistry,
5407        unwind: &dyn Unwind,
5408        gc_roots_list: &mut GcRootsList,
5409    ) {
5410        let ConcurrentState {
5411            table,
5412            worker,
5413            high_priority,
5414            low_priority,
5415
5416            // TODO(cm-gc): This field contains `ValRaw`s, but they are never GC
5417            // references because the component model doesn't support GC yet. We
5418            // will need to trace these somehow when it does.
5419            futures: _,
5420
5421            // These fields do not contain GC references.
5422            worker_item: _,
5423            unforced_current_thread: _,
5424            suspend_reason: _,
5425            global_error_context_ref_counts: _,
5426            interesting_tasks: _,
5427            interesting_tasks_empty_waker: _,
5428            ready_for_concurrent_call_waker: _,
5429        } = self;
5430
5431        for entry in table.get_mut().iter_mut() {
5432            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5433                for mode in set.waiting.values_mut() {
5434                    if let WaitMode::Fiber(fiber) = mode {
5435                        fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5436                    }
5437                }
5438            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5439                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5440                    &mut thread.state
5441                {
5442                    fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5443                }
5444            }
5445        }
5446
5447        if let Some(fiber) = worker {
5448            fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5449        }
5450
5451        let mut handle_item = |item: &mut WorkItem| match item {
5452            WorkItem::ResumeFiber(fiber) => {
5453                fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5454            }
5455            WorkItem::PushFuture(_future) => {
5456                // TODO(cm-gc): once futures can contain GC roots, we will need
5457                // to trace them.
5458            }
5459            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5460            }
5461        };
5462
5463        for item in high_priority {
5464            handle_item(item);
5465        }
5466        for item in low_priority {
5467            handle_item(item);
5468        }
5469    }
5470
5471    fn push<V: Send + Sync + 'static>(
5472        &mut self,
5473        value: V,
5474    ) -> Result<TableId<V>, ResourceTableError> {
5475        self.table.get_mut().push(value).map(TableId::from)
5476    }
5477
5478    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5479        self.table.get_mut().get_mut(&Resource::from(id))
5480    }
5481
5482    pub fn add_child<T: 'static, U: 'static>(
5483        &mut self,
5484        child: TableId<T>,
5485        parent: TableId<U>,
5486    ) -> Result<(), ResourceTableError> {
5487        self.table
5488            .get_mut()
5489            .add_child(Resource::from(child), Resource::from(parent))
5490    }
5491
5492    pub fn remove_child<T: 'static, U: 'static>(
5493        &mut self,
5494        child: TableId<T>,
5495        parent: TableId<U>,
5496    ) -> Result<(), ResourceTableError> {
5497        self.table
5498            .get_mut()
5499            .remove_child(Resource::from(child), Resource::from(parent))
5500    }
5501
5502    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5503        self.table.get_mut().delete(Resource::from(id))
5504    }
5505
5506    fn push_future(&mut self, future: HostTaskFuture) {
5507        // Note that we can't directly push to `ConcurrentState::futures` here
5508        // since this may be called from a future that's being polled inside
5509        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5510        // so it has exclusive access while polling it.  Therefore, we push a
5511        // work item to the "high priority" queue, which will actually push to
5512        // `ConcurrentState::futures` later.
5513        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5514    }
5515
5516    fn push_high_priority(&mut self, item: WorkItem) {
5517        log::trace!("push high priority: {item:?}");
5518        self.high_priority.push(item);
5519    }
5520
5521    fn push_low_priority(&mut self, item: WorkItem) {
5522        log::trace!("push low priority: {item:?}");
5523        self.low_priority.push_front(item);
5524    }
5525
5526    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5527        if high_priority {
5528            self.push_high_priority(item);
5529        } else {
5530            self.push_low_priority(item);
5531        }
5532    }
5533
5534    fn promote_instance_local_thread_work_item(
5535        &mut self,
5536        current_instance: RuntimeComponentInstanceIndex,
5537    ) -> bool {
5538        self.promote_work_items_matching(|item: &WorkItem| match item {
5539            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5540                *instance == current_instance
5541            }
5542            _ => false,
5543        })
5544    }
5545
5546    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5547        self.promote_work_items_matching(|item: &WorkItem| match item {
5548            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5549                *t == thread
5550            }
5551            _ => false,
5552        })
5553    }
5554
5555    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5556    where
5557        F: FnMut(&WorkItem) -> bool,
5558    {
5559        // If there's a high-priority work item to resume the current guest thread,
5560        // we don't need to promote anything, but we return true to indicate that
5561        // work is pending for the current instance.
5562        if self.high_priority.iter().any(&mut predicate) {
5563            true
5564        }
5565        // Otherwise, look for a low-priority work item that matches the current
5566        // instance and promote it to high-priority.
5567        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5568            let item = self.low_priority.remove(idx).unwrap();
5569            self.push_high_priority(item);
5570            true
5571        } else {
5572            false
5573        }
5574    }
5575
5576    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5577        if self.may_block(task)? {
5578            Ok(())
5579        } else {
5580            Err(Trap::CannotBlockSyncTask.into())
5581        }
5582    }
5583
5584    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5585        let task = self.get_mut(task)?;
5586        Ok(task.async_function || task.returned_or_cancelled())
5587    }
5588
5589    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5590    /// specified task.
5591    ///
5592    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5593    /// below.
5594    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5595        let (task, is_host) = (task >> 1, task & 1 == 1);
5596        if is_host {
5597            let task: TableId<HostTask> = TableId::new(task);
5598            Ok(&mut self.get_mut(task)?.call_context)
5599        } else {
5600            let task: TableId<GuestTask> = TableId::new(task);
5601            Ok(&mut self.get_mut(task)?.call_context)
5602        }
5603    }
5604
5605    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5606        match self.futures.get_mut().as_mut() {
5607            Some(f) => Ok(f),
5608            None => bail_bug!("futures field of concurrent state is currently taken"),
5609        }
5610    }
5611
5612    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5613        self.table.get_mut()
5614    }
5615
5616    /// Returns the parent thread, if any, of `cur`.
5617    fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5618        match cur {
5619            CurrentThread::Guest(thread) => {
5620                let task = self.get_mut(thread.task).ok()?;
5621                Some(match task.caller {
5622                    Caller::Host { caller, .. } => caller,
5623                    Caller::Guest { thread } => thread.into(),
5624                })
5625            }
5626            CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5627            CurrentThread::None => None,
5628        }
5629    }
5630}
5631
5632/// Provide a type hint to compiler about the shape of a parameter lower
5633/// closure.
5634fn for_any_lower<
5635    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5636>(
5637    fun: F,
5638) -> F {
5639    fun
5640}
5641
5642/// Provide a type hint to compiler about the shape of a result lift closure.
5643fn for_any_lift<
5644    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5645>(
5646    fun: F,
5647) -> F {
5648    fun
5649}
5650
5651fn check_ambient_store(id: StoreId) {
5652    let message = "\
5653        `Future`s which depend on asynchronous component tasks, streams, or \
5654        futures to complete may only be polled from the event loop of the \
5655        store to which they belong.  Please use \
5656        `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5657    ";
5658    tls::try_get(|store| {
5659        let matched = match store {
5660            tls::TryGet::Some(store) => store.id() == id,
5661            tls::TryGet::Taken | tls::TryGet::None => false,
5662        };
5663
5664        if !matched {
5665            panic!("{message}")
5666        }
5667    });
5668}
5669
5670/// Assert that `StoreContextMut::run_concurrent` has not been called from
5671/// within an store's event loop.
5672fn check_recursive_run() {
5673    tls::try_get(|store| {
5674        if !matches!(store, tls::TryGet::None) {
5675            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5676        }
5677    });
5678}
5679
5680fn unpack_callback_code(code: u32) -> (u32, u32) {
5681    (code & 0xF, code >> 4)
5682}
5683
5684/// Helper struct for packaging parameters to be passed to
5685/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5686/// `waitable-set.poll`.
5687struct WaitableCheckParams {
5688    set: TableId<WaitableSet>,
5689    options: OptionsIndex,
5690    payload: u32,
5691}
5692
5693/// Indicates whether `ComponentInstance::waitable_check` is being called for
5694/// `waitable-set.wait` or `waitable-set.poll`.
5695enum WaitableCheck {
5696    Wait,
5697    Poll,
5698}
5699
5700/// An identifier representing a guest task within a component.
5701///
5702/// This can be acquired by calling [`Func::start_call_concurrent`] or
5703/// [`TypedFunc::start_call_concurrent`] and then using the
5704/// [`FuncCallConcurrent::task`] accessor, for example. This can then be
5705/// reflected on with [`StoreContextMut::async_call_stack`].
5706///
5707/// [`TypedFunc::start_call_concurrent`]: crate::component::TypedFunc::start_call_concurrent
5708#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5709pub struct GuestTaskId(TableId<GuestTask>);
5710
5711/// Represents a guest task called from the host, prepared using `prepare_call`.
5712pub(crate) struct PreparedCall<R> {
5713    /// The guest export to be called
5714    handle: Func,
5715    /// The guest thread created by `prepare_call`
5716    thread: QualifiedThreadId,
5717    /// The number of lowered core Wasm parameters to pass to the call.
5718    param_count: usize,
5719    /// The `oneshot::Receiver` to which the result of the call will be
5720    /// delivered when it is available.
5721    rx: oneshot::Receiver<LiftedResult>,
5722    /// The instance that this call is prepared for.
5723    runtime_instance: RuntimeInstance,
5724    _phantom: PhantomData<R>,
5725}
5726
5727impl<R> PreparedCall<R> {
5728    /// Get a copy of the `TaskId` for this `PreparedCall`.
5729    pub(crate) fn task_id(&self) -> TaskId {
5730        TaskId {
5731            task: self.thread.task,
5732            runtime_instance: self.runtime_instance,
5733        }
5734    }
5735}
5736
5737/// Represents a task created by `prepare_call`.
5738pub(crate) struct TaskId {
5739    task: TableId<GuestTask>,
5740    runtime_instance: RuntimeInstance,
5741}
5742
5743impl TaskId {
5744    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5745    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5746    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5747    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5748    /// and delete the task when all threads are done.
5749    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5750        let task = store.concurrent_state_mut()?.get_mut(self.task)?;
5751        let delete = if !task.already_lowered_parameters() {
5752            store.cancel_guest_subtask_without_lowered_parameters(
5753                self.runtime_instance,
5754                self.task,
5755            )?;
5756            true
5757        } else {
5758            task.host_future_state = HostFutureState::Dropped;
5759            task.ready_to_delete()
5760        };
5761        if delete {
5762            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut()?)?
5763        }
5764        Ok(())
5765    }
5766}
5767
5768/// Prepare a call to the specified exported Wasm function, providing functions
5769/// for lowering the parameters and lifting the result.
5770///
5771/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5772/// loop, use `queue_call`.
5773pub(crate) fn prepare_call<T, R>(
5774    mut store: StoreContextMut<T>,
5775    handle: Func,
5776    param_count: usize,
5777    host_future_present: bool,
5778    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5779    + Send
5780    + Sync
5781    + 'static,
5782    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5783    + Send
5784    + Sync
5785    + 'static,
5786) -> Result<PreparedCall<R>> {
5787    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5788
5789    let instance = handle.instance().id().get(store.0);
5790    let options = &instance.component().env_component().options[options];
5791    let ty = &instance.component().types()[ty];
5792    let async_function = ty.async_;
5793    let task_return_type = ty.results;
5794    let component_instance = raw_options.instance;
5795    let callback = options.callback.map(|i| instance.runtime_callback(i));
5796    let memory = options
5797        .memory()
5798        .map(|i| instance.runtime_memory(i))
5799        .map(SendSyncPtr::new);
5800    let string_encoding = options.string_encoding;
5801    let token = StoreToken::new(store.as_context_mut());
5802    let caller = store.0.current_thread()?;
5803    let state = store.0.concurrent_state_mut()?;
5804
5805    let (tx, rx) = oneshot::channel();
5806
5807    let instance = handle.instance().runtime_instance(component_instance);
5808    let thread = GuestTask::new(
5809        state,
5810        Box::new(for_any_lower(move |store, params| {
5811            lower_params(handle, token.as_context_mut(store), params)
5812        })),
5813        LiftResult {
5814            lift: Box::new(for_any_lift(move |store, result| {
5815                lift_result(handle, store, result)
5816            })),
5817            ty: task_return_type,
5818            memory,
5819            string_encoding,
5820        },
5821        Caller::Host {
5822            tx: Some(tx),
5823            host_future_present,
5824            caller,
5825        },
5826        callback.map(|callback| {
5827            let callback = SendSyncPtr::new(callback);
5828            let instance = handle.instance();
5829            Box::new(move |store: &mut dyn VMStore, event, handle| {
5830                let store = token.as_context_mut(store);
5831                // SAFETY: Per the contract of `prepare_call`, the callback
5832                // will remain valid at least as long is this task exists.
5833                unsafe { instance.call_callback(store, callback, event, handle) }
5834            }) as CallbackFn
5835        }),
5836        instance,
5837        async_function,
5838    )?;
5839
5840    if !store.0.may_enter(instance)? {
5841        bail!(Trap::CannotEnterComponent);
5842    }
5843
5844    Ok(PreparedCall {
5845        handle,
5846        thread,
5847        param_count,
5848        runtime_instance: instance,
5849        rx,
5850        _phantom: PhantomData,
5851    })
5852}
5853
5854pub(crate) struct QueuedCall<R> {
5855    store: StoreId,
5856    task: TableId<GuestTask>,
5857    rx: oneshot::Receiver<LiftedResult>,
5858    _marker: PhantomData<fn() -> R>,
5859}
5860
5861impl<R> QueuedCall<R> {
5862    /// Queue a call previously prepared using `prepare_call` to be run as part of
5863    /// the associated `ComponentInstance`'s event loop.
5864    ///
5865    /// The returned future will resolve to the result once it is available, but
5866    /// must only be polled via the instance's event loop. See
5867    /// `StoreContextMut::run_concurrent` for details.
5868    pub(crate) fn new<T: 'static>(
5869        mut store: StoreContextMut<T>,
5870        prepared: PreparedCall<R>,
5871    ) -> Result<QueuedCall<R>> {
5872        let PreparedCall {
5873            handle,
5874            thread,
5875            param_count,
5876            rx,
5877            ..
5878        } = prepared;
5879
5880        queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5881
5882        Ok(QueuedCall {
5883            store: store.0.id(),
5884            task: thread.task,
5885            rx,
5886            _marker: PhantomData,
5887        })
5888    }
5889
5890    fn task(&self) -> GuestTaskId {
5891        GuestTaskId(self.task)
5892    }
5893}
5894
5895impl<R> Future for QueuedCall<R>
5896where
5897    R: 'static,
5898{
5899    type Output = Result<R>;
5900
5901    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5902        check_ambient_store(self.store);
5903        Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5904            Ok(r) => match r.downcast() {
5905                Ok(r) => Ok(*r),
5906                Err(_) => bail_bug!("wrong type of value produced"),
5907            },
5908            Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5909        })
5910    }
5911}
5912
5913/// Queue a call previously prepared using `prepare_call` to be run as part of
5914/// the associated `ComponentInstance`'s event loop.
5915fn queue_call0<T: 'static>(
5916    store: StoreContextMut<T>,
5917    handle: Func,
5918    guest_thread: QualifiedThreadId,
5919    param_count: usize,
5920) -> Result<()> {
5921    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5922    let is_concurrent = raw_options.async_;
5923    let callback = raw_options.callback;
5924    let instance = handle.instance();
5925    let callee = handle.lifted_core_func(store.0);
5926    let post_return = handle.post_return_core_func(store.0);
5927    let callback = callback.map(|i| {
5928        let instance = instance.id().get(store.0);
5929        SendSyncPtr::new(instance.runtime_callback(i))
5930    });
5931
5932    log::trace!("queueing call {guest_thread:?}");
5933
5934    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5935    // (with signatures appropriate for this call) and will remain valid as
5936    // long as this instance is valid.
5937    unsafe {
5938        instance.queue_call(
5939            store,
5940            guest_thread,
5941            SendSyncPtr::new(callee),
5942            param_count,
5943            1,
5944            is_concurrent,
5945            callback,
5946            post_return.map(SendSyncPtr::new),
5947        )
5948    }
5949}