Skip to main content

wasmtime/runtime/component/
concurrent.rs

1//! Runtime support for the Component Model Async ABI.
2//!
3//! This module and its submodules provide host runtime support for Component
4//! Model Async features such as async-lifted exports, async-lowered imports,
5//! streams, futures, and related intrinsics.  See [the Async
6//! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7//! for a high-level overview.
8//!
9//! At the core of this support is an event loop which schedules and switches
10//! between guest tasks and any host tasks they create.  Each
11//! `Store` will have at most one event loop running at any given
12//! time, and that loop may be suspended and resumed by the host embedder using
13//! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14//! function contains the loop itself, while the
15//! `StoreOpaque::concurrent_state` field holds its state.
16//!
17//! # Public API Overview
18//!
19//! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20//!
21//! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22//! async-lifted or sync-lifted import, creating a guest task.
23//!
24//! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25//! instance, allowing any and all tasks belonging to that instance to make
26//! progress.
27//!
28//! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29//! for the specified instance.
30//!
31//! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32//! `stream` which may be passed to the guest.  This takes a
33//! `{Future,Stream}Producer` implementation which will be polled for items when
34//! the consumer requests them.
35//!
36//! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37//! connecting it to a `{Future,Stream}Consumer` which will consume any items
38//! produced by the write end.
39//!
40//! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41//!
42//! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43//! function with the linker.  That function will take an `Accessor` as its
44//! first parameter, which provides access to the store between (but not across)
45//! await points.
46//!
47//! - `Accessor::with`: Access the store and its associated data.
48//!
49//! - `Accessor::spawn`: Run a background task as part of the event loop for the
50//! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51//! in host functions.
52
53use self::error_contexts::GlobalErrorContextRefCount;
54use crate::bail_bug;
55use crate::component::func::{Func, call_post_return};
56use crate::component::{
57    HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
58};
59use crate::fiber::{self, StoreFiber, StoreFiberYield};
60use crate::hash_set::HashSet;
61#[cfg(feature = "gc")]
62use crate::module::ModuleRegistry;
63use crate::prelude::*;
64use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
65#[cfg(feature = "gc")]
66use crate::vm::GcRootsList;
67use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
68use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMLazyThread, VMMemoryDefinition, VMStore};
69use crate::{
70    AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
71};
72use alloc::borrow::ToOwned;
73use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
74use core::any::Any;
75use core::cell::UnsafeCell;
76use core::fmt;
77use core::future;
78use core::future::Future;
79use core::marker::PhantomData;
80use core::mem::{self, ManuallyDrop, MaybeUninit};
81use core::ops::DerefMut;
82use core::pin::{Pin, pin};
83use core::ptr::{self, NonNull};
84use core::task::{Context, Poll, Waker};
85use futures::channel::oneshot;
86use futures::stream::{FuturesUnordered, StreamExt};
87use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
88use table::{TableDebug, TableId};
89use wasmtime_environ::component::{
90    CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
91    MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92    RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93    TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94    TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95};
96use wasmtime_environ::packed_option::ReservedValue;
97use wasmtime_environ::{NUM_COMPONENT_CONTEXT_SLOTS, Trap};
98#[cfg(feature = "gc")]
99use wasmtime_unwinder::Unwind;
100
101pub use abort::JoinHandle;
102pub use func::{FuncCallConcurrent, TypedFuncCallConcurrent};
103pub use future_stream_any::{FutureAny, StreamAny};
104pub use futures_and_streams::{
105    Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
106    FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
107    StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
108};
109pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
110
111mod abort;
112mod error_contexts;
113mod func;
114mod future_stream_any;
115mod futures_and_streams;
116pub(crate) mod table;
117pub(crate) mod tls;
118
119/// Constant defined in the Component Model spec to indicate that the async
120/// intrinsic (e.g. `future.write`) has not yet completed.
121const BLOCKED: u32 = 0xffff_ffff;
122
123/// Corresponds to `CallState` in the upstream spec.
124#[derive(Clone, Copy, Eq, PartialEq, Debug)]
125pub enum Status {
126    Starting = 0,
127    Started = 1,
128    Returned = 2,
129    StartCancelled = 3,
130    ReturnCancelled = 4,
131}
132
133impl Status {
134    /// Packs this status and the optional `waitable` provided into a 32-bit
135    /// result that the canonical ABI requires.
136    ///
137    /// The low 4 bits are reserved for the status while the upper 28 bits are
138    /// the waitable, if present.
139    pub fn pack(self, waitable: Option<u32>) -> u32 {
140        assert!(matches!(self, Status::Returned) == waitable.is_none());
141        let waitable = waitable.unwrap_or(0);
142        assert!(waitable < (1 << 28));
143        (waitable << 4) | (self as u32)
144    }
145}
146
147/// Corresponds to `EventCode` in the Component Model spec, plus related payload
148/// data.
149#[derive(Clone, Copy, Debug)]
150enum Event {
151    None,
152    Subtask {
153        status: Status,
154    },
155    StreamRead {
156        code: ReturnCode,
157        pending: Option<(TypeStreamTableIndex, u32)>,
158    },
159    StreamWrite {
160        code: ReturnCode,
161        pending: Option<(TypeStreamTableIndex, u32)>,
162    },
163    FutureRead {
164        code: ReturnCode,
165        pending: Option<(TypeFutureTableIndex, u32)>,
166    },
167    FutureWrite {
168        code: ReturnCode,
169        pending: Option<(TypeFutureTableIndex, u32)>,
170    },
171    Cancelled,
172}
173
174impl Event {
175    /// Lower this event to core Wasm integers for delivery to the guest.
176    ///
177    /// Note that the waitable handle, if any, is assumed to be lowered
178    /// separately.
179    fn parts(self) -> (u32, u32) {
180        const EVENT_NONE: u32 = 0;
181        const EVENT_SUBTASK: u32 = 1;
182        const EVENT_STREAM_READ: u32 = 2;
183        const EVENT_STREAM_WRITE: u32 = 3;
184        const EVENT_FUTURE_READ: u32 = 4;
185        const EVENT_FUTURE_WRITE: u32 = 5;
186        const EVENT_CANCELLED: u32 = 6;
187        match self {
188            Event::None => (EVENT_NONE, 0),
189            Event::Cancelled => (EVENT_CANCELLED, 0),
190            Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
191            Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
192            Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
193            Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
194            Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
195        }
196    }
197}
198
199/// Corresponds to `CallbackCode` in the spec.
200mod callback_code {
201    pub const EXIT: u32 = 0;
202    pub const YIELD: u32 = 1;
203    pub const WAIT: u32 = 2;
204}
205
206/// A flag indicating that the callee is an async-lowered export.
207///
208/// This may be passed to the `async-start` intrinsic from a fused adapter.
209const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
210
211/// Provides access to either store data (via the `get` method) or the store
212/// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
213/// instance to which the current host task belongs.
214///
215/// See [`Accessor::with`] for details.
216pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
217    store: StoreContextMut<'a, T>,
218    get_data: fn(&mut T) -> D::Data<'_>,
219}
220
221impl<'a, T, D> Access<'a, T, D>
222where
223    D: HasData + ?Sized,
224    T: 'static,
225{
226    /// Creates a new [`Access`] from its component parts.
227    pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
228        Self { store, get_data }
229    }
230
231    /// Get mutable access to the store data.
232    pub fn data_mut(&mut self) -> &mut T {
233        self.store.data_mut()
234    }
235
236    /// Get mutable access to the store data.
237    pub fn get(&mut self) -> D::Data<'_> {
238        (self.get_data)(self.data_mut())
239    }
240
241    /// Spawn a background task.
242    ///
243    /// See [`Accessor::spawn`] for details.
244    pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
245    where
246        T: 'static,
247    {
248        let accessor = Accessor {
249            get_data: self.get_data,
250            token: StoreToken::new(self.store.as_context_mut()),
251        };
252        self.store
253            .as_context_mut()
254            .spawn_with_accessor(accessor, task)
255    }
256
257    /// Returns the getter this accessor is using to project from `T` into
258    /// `D::Data`.
259    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
260        self.get_data
261    }
262}
263
264impl<'a, T, D> AsContext for Access<'a, T, D>
265where
266    D: HasData + ?Sized,
267    T: 'static,
268{
269    type Data = T;
270
271    fn as_context(&self) -> StoreContext<'_, T> {
272        self.store.as_context()
273    }
274}
275
276impl<'a, T, D> AsContextMut for Access<'a, T, D>
277where
278    D: HasData + ?Sized,
279    T: 'static,
280{
281    fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
282        self.store.as_context_mut()
283    }
284}
285
286/// Provides scoped mutable access to store data in the context of a concurrent
287/// host task future.
288///
289/// This allows multiple host task futures to execute concurrently and access
290/// the store between (but not across) `await` points.
291///
292/// # Rationale
293///
294/// This structure is sort of like `&mut T` plus a projection from `&mut T` to
295/// `D::Data<'_>`. The problem this is solving, however, is that it does not
296/// literally store these values. The basic problem is that when a concurrent
297/// host future is being polled it has access to `&mut T` (and the whole
298/// `Store`) but when it's not being polled it does not have access to these
299/// values. This reflects how the store is only ever polling one future at a
300/// time so the store is effectively being passed between futures.
301///
302/// Rust's `Future` trait, however, has no means of passing a `Store`
303/// temporarily between futures. The [`Context`](core::task::Context) type does
304/// not have the ability to attach arbitrary information to it at this time.
305/// This type, [`Accessor`], is used to bridge this expressivity gap.
306///
307/// The [`Accessor`] type here represents the ability to acquire, temporarily in
308/// a synchronous manner, the current store. The [`Accessor::with`] function
309/// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
310/// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
311/// not take an `async` closure as its argument, instead it's a synchronous
312/// closure which must complete during on run of `Future::poll`. This reflects
313/// how the store is temporarily made available while a host future is being
314/// polled.
315///
316/// # Implementation
317///
318/// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
319/// this type additionally doesn't even have a lifetime parameter. This is
320/// instead a representation of proof of the ability to acquire these while a
321/// future is being polled. Wasmtime will, when it polls a host future,
322/// configure ambient state such that the `Accessor` that a future closes over
323/// will work and be able to access the store.
324///
325/// This has a number of implications for users such as:
326///
327/// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
328///   the lifetime of a single future.
329/// * A future is expected to, however, close over an `Accessor` and keep it
330///   alive probably for the duration of the entire future.
331/// * Different host futures will be given different `Accessor`s, and that's
332///   intentional.
333/// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
334///   alleviates some otherwise required bounds to be written down.
335///
336/// # Using `Accessor` in `Drop`
337///
338/// The methods on `Accessor` are only expected to work in the context of
339/// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
340/// host future can be dropped at any time throughout the system and Wasmtime
341/// store context is not necessarily available at that time. It's recommended to
342/// not use `Accessor` methods in anything connected to a `Drop` implementation
343/// as they will panic and have unintended results. If you run into this though
344/// feel free to file an issue on the Wasmtime repository.
345pub struct Accessor<T: 'static, D = HasSelf<T>>
346where
347    D: HasData + ?Sized,
348{
349    token: StoreToken<T>,
350    get_data: fn(&mut T) -> D::Data<'_>,
351}
352
353/// A helper trait to take any type of accessor-with-data in functions.
354///
355/// This trait is similar to [`AsContextMut`] except that it's used when
356/// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
357/// [`Accessor`] is the main type used in concurrent settings and is passed to
358/// functions such as [`Func::call_concurrent`].
359///
360/// This trait is implemented for [`Accessor`] and `&T` where `T` implements
361/// this trait. This effectively means that regardless of the `D` in
362/// `Accessor<T, D>` it can still be passed to a function which just needs a
363/// store accessor.
364///
365/// Acquiring an [`Accessor`] can be done through
366/// [`StoreContextMut::run_concurrent`] for example or in a host function
367/// through
368/// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
369pub trait AsAccessor {
370    /// The `T` in `Store<T>` that this accessor refers to.
371    type Data: 'static;
372
373    /// The `D` in `Accessor<T, D>`, or the projection out of
374    /// `Self::Data`.
375    type AccessorData: HasData + ?Sized;
376
377    /// Returns the accessor that this is referring to.
378    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
379}
380
381impl<T: AsAccessor + ?Sized> AsAccessor for &T {
382    type Data = T::Data;
383    type AccessorData = T::AccessorData;
384
385    fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
386        T::as_accessor(self)
387    }
388}
389
390impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
391    type Data = T;
392    type AccessorData = D;
393
394    fn as_accessor(&self) -> &Accessor<T, D> {
395        self
396    }
397}
398
399// Note that it is intentional at this time that `Accessor` does not actually
400// store `&mut T` or anything similar. This distinctly enables the `Accessor`
401// structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
402// that matter). This is used to ergonomically simplify bindings where the
403// majority of the time `Accessor` is closed over in a future which then needs
404// to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
405// you already have to write `T: 'static`...) it helps to avoid this.
406//
407// Note as well that `Accessor` doesn't actually store its data at all. Instead
408// it's more of a "proof" of what can be accessed from TLS. API design around
409// `Accessor` and functions like `Linker::func_wrap_concurrent` are
410// intentionally made to ensure that `Accessor` is ideally only used in the
411// context that TLS variables are actually set. For example host functions are
412// given `&Accessor`, not `Accessor`, and this prevents them from persisting
413// the value outside of a future. Within the future the TLS variables are all
414// guaranteed to be set while the future is being polled.
415//
416// Finally though this is not an ironclad guarantee, but nor does it need to be.
417// The TLS APIs are designed to panic or otherwise model usage where they're
418// called recursively or similar. It's hoped that code cannot be constructed to
419// actually hit this at runtime but this is not a safety requirement at this
420// time.
421const _: () = {
422    const fn assert<T: Send + Sync>() {}
423    assert::<Accessor<UnsafeCell<u32>>>();
424};
425
426impl<T> Accessor<T> {
427    /// Creates a new `Accessor` backed by the specified functions.
428    ///
429    /// - `get`: used to retrieve the store
430    ///
431    /// - `get_data`: used to "project" from the store's associated data to
432    /// another type (e.g. a field of that data or a wrapper around it).
433    ///
434    /// - `spawn`: used to queue spawned background tasks to be run later
435    pub(crate) fn new(token: StoreToken<T>) -> Self {
436        Self {
437            token,
438            get_data: |x| x,
439        }
440    }
441}
442
443impl<T, D> Accessor<T, D>
444where
445    D: HasData + ?Sized,
446{
447    /// Run the specified closure, passing it mutable access to the store.
448    ///
449    /// This function is one of the main building blocks of the [`Accessor`]
450    /// type. This yields synchronous, blocking, access to the store via an
451    /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
452    /// providing the ability to access `D` via [`Access::get`]. Note that the
453    /// `fun` here is given only temporary access to the store and `T`/`D`
454    /// meaning that the return value `R` here is not allowed to capture borrows
455    /// into the two. If access is needed to data within `T` or `D` outside of
456    /// this closure then it must be `clone`d out, for example.
457    ///
458    /// # Panics
459    ///
460    /// This function will panic if it is call recursively with any other
461    /// accessor already in scope. For example if `with` is called within `fun`,
462    /// then this function will panic. It is up to the embedder to ensure that
463    /// this does not happen.
464    pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
465        tls::get(|vmstore| {
466            fun(Access {
467                store: self.token.as_context_mut(vmstore),
468                get_data: self.get_data,
469            })
470        })
471    }
472
473    /// Returns the getter this accessor is using to project from `T` into
474    /// `D::Data`.
475    pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
476        self.get_data
477    }
478
479    /// Changes this accessor to access `D2` instead of the current type
480    /// parameter `D`.
481    ///
482    /// This changes the underlying data access from `T` to `D2::Data<'_>`.
483    ///
484    /// # Panics
485    ///
486    /// When using this API the returned value is disconnected from `&self` and
487    /// the lifetime binding the `self` argument. An `Accessor` only works
488    /// within the context of the closure or async closure that it was
489    /// originally given to, however. This means that due to the fact that the
490    /// returned value has no lifetime connection it's possible to use the
491    /// accessor outside of `&self`, the original accessor, and panic.
492    ///
493    /// The returned value should only be used within the scope of the original
494    /// `Accessor` that `self` refers to.
495    pub fn with_getter<D2: HasData>(
496        &self,
497        get_data: fn(&mut T) -> D2::Data<'_>,
498    ) -> Accessor<T, D2> {
499        Accessor {
500            token: self.token,
501            get_data,
502        }
503    }
504
505    /// Spawn a background task which will receive an `&Accessor<T, D>` and
506    /// run concurrently with any other tasks in progress for the current
507    /// store.
508    ///
509    /// This is particularly useful for host functions which return a `stream`
510    /// or `future` such that the code to write to the write end of that
511    /// `stream` or `future` must run after the function returns.
512    ///
513    /// The returned [`JoinHandle`] may be used to cancel the task.
514    ///
515    /// # Panics
516    ///
517    /// Panics if called within a closure provided to the [`Accessor::with`]
518    /// function. This can only be called outside an active invocation of
519    /// [`Accessor::with`].
520    pub fn spawn(&self, task: impl AccessorTask<T, D>) -> Result<JoinHandle>
521    where
522        T: 'static,
523    {
524        let accessor = self.clone_for_spawn();
525        self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
526    }
527
528    fn clone_for_spawn(&self) -> Self {
529        Self {
530            token: self.token,
531            get_data: self.get_data,
532        }
533    }
534
535    /// Polls to see if this store contains any "interesting" tasks still within
536    /// it.
537    ///
538    /// Returns `Poll::Ready(())` if there are no more interesting tasks, and
539    /// otherwise returns `Poll::Pending`. If pending is returned then whenever
540    /// the last remaining "interesting" task has exited the provided context's
541    /// waker will be notified. Note that only the waker passed to the last call
542    /// to `poll_no_interesting_tasks` will be notified, so this is only
543    /// appropriate to use one-at-a-time.
544    ///
545    /// The component model specification, as of this current date, does not
546    /// have a distinction between "interesting" tasks and not. The current
547    /// intention is that in a future revision of the component model this will
548    /// be distinguished at the component ABI level where tasks will be able to
549    /// flag themselves as "interesting" optionally. Additionally extra work can
550    /// be opted-in to being "interesting".
551    ///
552    /// For now what this means is that all component model tasks within this
553    /// store are considered interesting. This specifically includes the entire
554    /// duration of a task, so even all of the time after a task has returned
555    /// but before it has exited. This means that this function is, today,
556    /// effectively a proxy for "are there any more tasks still running in this
557    /// store". This can be used by embedders to determine whether there's any
558    /// more work going on, even in the background, for a particular guest.
559    /// Hosts can use this as a signal that the guest wants to stay alive a
560    /// little longer, even after a task has returned.
561    ///
562    /// In the future this predicate won't include all tasks in this store. Some
563    /// tasks will be able to flag themselves as not interesting, meaning that
564    /// when this returns ready it'd be possible that there are still tasks
565    /// remaining in the store.
566    ///
567    /// Note that at this time spawned threads within a task are always
568    /// considered uninteresting. If this function returns ready, then spawned
569    /// threads may still be in the store.
570    pub fn poll_no_interesting_tasks(&self, cx: &mut Context<'_>) -> Poll<()> {
571        self.with(|mut access| {
572            let store = access.as_context_mut().0;
573            let state = store.concurrent_state_mut_without_forcing_current_thread();
574            if state.interesting_tasks == 0 {
575                Poll::Ready(())
576            } else {
577                state.interesting_tasks_empty_waker = Some(cx.waker().clone());
578                Poll::Pending
579            }
580        })
581    }
582}
583
584/// Represents a task which may be provided to `Accessor::spawn`,
585/// `Accessor::forward`, or `StorecContextMut::spawn`.
586// TODO: Replace this with `core::ops::AsyncFnOnce` when that becomes a viable
587// option.
588//
589// As of this writing, it's not possible to specify e.g. `Send` and `Sync`
590// bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
591// Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
592// Send + Sync + 'static` fails with a type mismatch error when we try to pass
593// it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
594// best we can do for the time being.
595pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
596where
597    D: HasData + ?Sized,
598{
599    /// Run the task.
600    fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
601}
602
603/// Represents parameter and result metadata for the caller side of a
604/// guest->guest call orchestrated by a fused adapter.
605enum CallerInfo {
606    /// Metadata for a call to an async-lowered import
607    Async {
608        params: Vec<ValRaw>,
609        has_result: bool,
610    },
611    /// Metadata for a call to an sync-lowered import
612    Sync {
613        params: Vec<ValRaw>,
614        result_count: u32,
615    },
616}
617
618/// Indicates how a guest task is waiting on a waitable set.
619enum WaitMode {
620    /// The guest task is waiting using `task.wait`
621    Fiber(StoreFiber<'static>),
622    /// The guest task is waiting via a callback declared as part of an
623    /// async-lifted export.
624    Callback(Instance),
625}
626
627/// Represents the reason a fiber is suspending itself.
628#[derive(Debug)]
629enum SuspendReason {
630    /// The fiber is waiting for an event to be delivered to the specified
631    /// waitable set or task.
632    Waiting {
633        set: TableId<WaitableSet>,
634        thread: QualifiedThreadId,
635        skip_may_block_check: bool,
636    },
637    /// The fiber has finished handling its most recent work item and is waiting
638    /// for another (or to be dropped if it is no longer needed).
639    NeedWork,
640    /// The fiber is yielding and should be resumed once other tasks have had a
641    /// chance to run.
642    Yielding {
643        thread: QualifiedThreadId,
644        cancellable: bool,
645        skip_may_block_check: bool,
646    },
647    /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
648    ExplicitlySuspending {
649        thread: QualifiedThreadId,
650        skip_may_block_check: bool,
651    },
652}
653
654/// Represents a pending call into guest code for a given guest task.
655enum GuestCallKind {
656    /// Indicates there's an event to deliver to the task, possibly related to a
657    /// waitable set the task has been waiting on or polling.
658    DeliverEvent {
659        /// The instance to which the task belongs.
660        instance: Instance,
661        /// The waitable set the event belongs to, if any.
662        ///
663        /// If this is `None` the event will be waiting in the
664        /// `GuestTask::event` field for the task.
665        set: Option<TableId<WaitableSet>>,
666    },
667    /// Indicates that a new guest task call is pending and may be executed
668    /// using the specified closure.
669    ///
670    /// If the closure returns `Ok(Some(call))`, the `call` should be run
671    /// immediately using `handle_guest_call`.
672    StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
673    StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
674}
675
676impl fmt::Debug for GuestCallKind {
677    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678        match self {
679            Self::DeliverEvent { instance, set } => f
680                .debug_struct("DeliverEvent")
681                .field("instance", instance)
682                .field("set", set)
683                .finish(),
684            Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
685            Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
686        }
687    }
688}
689
690/// The target of a suspension intrinsic.
691#[derive(Copy, Clone, Debug)]
692pub enum SuspensionTarget {
693    SomeSuspended(u32),
694    Some(u32),
695    None,
696}
697
698impl SuspensionTarget {
699    fn is_none(&self) -> bool {
700        matches!(self, SuspensionTarget::None)
701    }
702    fn is_some(&self) -> bool {
703        !self.is_none()
704    }
705}
706
707/// Represents a pending call into guest code for a given guest thread.
708#[derive(Debug)]
709struct GuestCall {
710    thread: QualifiedThreadId,
711    kind: GuestCallKind,
712}
713
714impl GuestCall {
715    /// Returns whether or not the call is ready to run.
716    ///
717    /// A call will not be ready to run if either:
718    ///
719    /// - the (sub-)component instance to be called has already been entered and
720    /// cannot be reentered until an in-progress call completes
721    ///
722    /// - the call is for a not-yet started task and the (sub-)component
723    /// instance to be called has backpressure enabled
724    fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
725        let instance = store
726            .concurrent_state_mut()?
727            .get_mut(self.thread.task)?
728            .instance;
729        let state = store.instance_state(instance).concurrent_state();
730
731        let ready = match &self.kind {
732            GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
733            GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
734            GuestCallKind::StartExplicit(_) => true,
735        };
736        log::trace!(
737            "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
738            state.do_not_enter,
739            state.backpressure
740        );
741        Ok(ready)
742    }
743}
744
745/// Job to be run on a worker fiber.
746enum WorkerItem {
747    GuestCall(GuestCall),
748    Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
749}
750
751/// Represents a pending work item to be handled by the event loop for a given
752/// component instance.
753enum WorkItem {
754    /// A host task to be pushed to `ConcurrentState::futures`.
755    PushFuture(AlwaysMut<HostTaskFuture>),
756    /// A fiber to resume.
757    ResumeFiber(StoreFiber<'static>),
758    /// A thread to resume.
759    ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
760    /// A pending call into guest code for a given guest task.
761    GuestCall(RuntimeComponentInstanceIndex, GuestCall),
762    /// A job to run on a worker fiber.
763    WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
764}
765
766impl fmt::Debug for WorkItem {
767    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
768        match self {
769            Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
770            Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
771            Self::ResumeThread(instance, thread) => f
772                .debug_tuple("ResumeThread")
773                .field(instance)
774                .field(thread)
775                .finish(),
776            Self::GuestCall(instance, call) => f
777                .debug_tuple("GuestCall")
778                .field(instance)
779                .field(call)
780                .finish(),
781            Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
782        }
783    }
784}
785
786/// Whether a suspension intrinsic was cancelled or completed
787#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
788pub(crate) enum WaitResult {
789    Cancelled,
790    Completed,
791}
792
793/// Poll the specified future until it completes on behalf of a guest->host call
794/// using a sync-lowered import.
795///
796/// This is similar to `Instance::first_poll` except it's for sync-lowered
797/// imports, meaning we don't need to handle cancellation and we can block the
798/// caller until the task completes, at which point the caller can handle
799/// lowering the result to the guest's stack and linear memory.
800pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
801    store: &mut dyn VMStore,
802    future: impl Future<Output = Result<R>> + Send + 'static,
803) -> Result<R> {
804    let task = store.current_host_thread()?;
805
806    // Wrap the future in a closure which will take care of stashing the result
807    // in `GuestTask::result` and resuming this fiber when the host task
808    // completes.
809    let mut future = Box::pin(async move {
810        let result = future.await?;
811        tls::get(move |store| {
812            let state = store.concurrent_state_mut()?;
813            let host_state = &mut state.get_mut(task)?.state;
814            assert!(matches!(host_state, HostTaskState::CalleeStarted));
815            *host_state = HostTaskState::CalleeFinished(Box::new(result));
816
817            Waitable::Host(task).set_event(
818                state,
819                Some(Event::Subtask {
820                    status: Status::Returned,
821                }),
822            )?;
823
824            Ok(())
825        })
826    }) as HostTaskFuture;
827
828    // Finally, poll the future.  We can use a dummy `Waker` here because we'll
829    // add the future to `ConcurrentState::futures` and poll it automatically
830    // from the event loop if it doesn't complete immediately here.
831    let poll = tls::set(store, || {
832        future
833            .as_mut()
834            .poll(&mut Context::from_waker(&Waker::noop()))
835    });
836
837    match poll {
838        // It completed immediately; check the result and delete the task.
839        Poll::Ready(result) => result?,
840
841        // It did not complete immediately; add it to
842        // `ConcurrentState::futures` so it will be polled via the event loop;
843        // then use `GuestThread::sync_call_set` to wait for the task to
844        // complete, suspending the current fiber until it does so.
845        Poll::Pending => {
846            let state = store.concurrent_state_mut()?;
847            state.push_future(future);
848
849            let caller = state.get_mut(task)?.caller;
850            let set = state.get_mut(caller.thread)?.sync_call_set;
851            Waitable::Host(task).join(state, Some(set))?;
852
853            store.suspend(SuspendReason::Waiting {
854                set,
855                thread: caller,
856                skip_may_block_check: false,
857            })?;
858
859            // Remove the `task` from the `sync_call_set` to ensure that when
860            // this function returns and the task is deleted that there are no
861            // more lingering references to this host task.
862            Waitable::Host(task).join(store.concurrent_state_mut()?, None)?;
863        }
864    }
865
866    // Retrieve and return the result.
867    let host_state = &mut store.concurrent_state_mut()?.get_mut(task)?.state;
868    match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
869        HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
870            Ok(result) => *result,
871            Err(_) => bail_bug!("host task finished with wrong type of result"),
872        }),
873        _ => bail_bug!("unexpected host task state after completion"),
874    }
875}
876
877/// Execute the specified guest call.
878fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
879    let mut next = Some(call);
880    while let Some(call) = next.take() {
881        match call.kind {
882            GuestCallKind::DeliverEvent { instance, set } => {
883                let (event, waitable) =
884                    match instance.get_event(store, call.thread.task, set, true)? {
885                        Some(pair) => pair,
886                        None => bail_bug!("delivering non-present event"),
887                    };
888                let state = store.concurrent_state_mut()?;
889                let task = state.get_mut(call.thread.task)?;
890                let runtime_instance = task.instance;
891                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
892
893                log::trace!(
894                    "use callback to deliver event {event:?} to {:?} for {waitable:?}",
895                    call.thread,
896                );
897
898                let old_thread = store.set_thread(call.thread)?;
899                log::trace!(
900                    "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
901                    call.thread
902                );
903
904                store.enter_instance(runtime_instance);
905
906                let Some(callback) = store
907                    .concurrent_state_mut()?
908                    .get_mut(call.thread.task)?
909                    .callback
910                    .take()
911                else {
912                    bail_bug!("guest task callback field not present")
913                };
914
915                let code = callback(store, event, handle)?;
916
917                store
918                    .concurrent_state_mut()?
919                    .get_mut(call.thread.task)?
920                    .callback = Some(callback);
921
922                store.exit_instance(runtime_instance)?;
923
924                store.set_thread(old_thread)?;
925
926                next = instance.handle_callback_code(
927                    store,
928                    call.thread,
929                    runtime_instance.index,
930                    code,
931                )?;
932
933                log::trace!(
934                    "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
935                );
936            }
937            GuestCallKind::StartImplicit(fun) => {
938                next = fun(store)?;
939            }
940            GuestCallKind::StartExplicit(fun) => {
941                fun(store)?;
942            }
943        }
944    }
945
946    Ok(())
947}
948
949impl<T> Store<T> {
950    /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
951    pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
952    where
953        T: Send + 'static,
954    {
955        ensure!(
956            self.as_context().0.concurrency_support(),
957            "cannot use `run_concurrent` when Config::concurrency_support disabled",
958        );
959        self.as_context_mut().run_concurrent(fun).await
960    }
961
962    #[doc(hidden)]
963    pub fn assert_concurrent_state_empty(&mut self) {
964        self.as_context_mut().assert_concurrent_state_empty();
965    }
966
967    #[doc(hidden)]
968    pub fn concurrent_state_table_size(&mut self) -> usize {
969        self.as_context_mut().concurrent_state_table_size()
970    }
971
972    /// Convenience wrapper for [`StoreContextMut::spawn`].
973    pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> Result<JoinHandle>
974    where
975        T: 'static,
976    {
977        self.as_context_mut().spawn(task)
978    }
979}
980
981impl<T> StoreContextMut<'_, T> {
982    /// Assert that all the relevant tables and queues in the concurrent state
983    /// for this store are empty.
984    ///
985    /// This is for sanity checking in integration tests
986    /// (e.g. `component-async-tests`) that the relevant state has been cleared
987    /// after each test concludes.  This should help us catch leaks, e.g. guest
988    /// tasks which haven't been deleted despite having completed and having
989    /// been dropped by their supertasks.
990    ///
991    /// Only intended for use in Wasmtime's own testing.
992    #[doc(hidden)]
993    pub fn assert_concurrent_state_empty(self) {
994        let store = self.0;
995        store
996            .store_data_mut()
997            .components
998            .assert_instance_states_empty();
999        let state = store.concurrent_state_mut().unwrap();
1000        assert!(
1001            state.table.get_mut().is_empty(),
1002            "non-empty table: {:?}",
1003            state.table.get_mut()
1004        );
1005        assert!(state.high_priority.is_empty());
1006        assert!(state.low_priority.is_empty());
1007        assert!(state.unforced_current_thread.is_none());
1008        assert!(state.futures_mut().unwrap().is_empty());
1009        assert!(state.global_error_context_ref_counts.is_empty());
1010    }
1011
1012    /// Helper function to perform tests over the size of the concurrent state
1013    /// table which can be useful for detecting leaks.
1014    ///
1015    /// Only intended for use in Wasmtime's own testing.
1016    #[doc(hidden)]
1017    pub fn concurrent_state_table_size(&mut self) -> usize {
1018        self.0
1019            .concurrent_state_mut()
1020            .unwrap()
1021            .table
1022            .get_mut()
1023            .iter_mut()
1024            .count()
1025    }
1026
1027    /// Spawn a background task to run as part of this instance's event loop.
1028    ///
1029    /// The task will receive an `&Accessor<U>` and run concurrently with
1030    /// any other tasks in progress for the instance.
1031    ///
1032    /// Note that the task will only make progress if and when the event loop
1033    /// for this instance is run.
1034    ///
1035    /// The returned [`JoinHandle`] may be used to cancel the task.
1036    pub fn spawn(mut self, task: impl AccessorTask<T>) -> Result<JoinHandle>
1037    where
1038        T: 'static,
1039    {
1040        let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
1041        self.spawn_with_accessor(accessor, task)
1042    }
1043
1044    /// Internal implementation of `spawn` functions where a `store` is
1045    /// available along with an `Accessor`.
1046    fn spawn_with_accessor<D>(
1047        self,
1048        accessor: Accessor<T, D>,
1049        task: impl AccessorTask<T, D>,
1050    ) -> Result<JoinHandle>
1051    where
1052        T: 'static,
1053        D: HasData + ?Sized,
1054    {
1055        // Create an "abortable future" here where internally the future will
1056        // hook calls to poll and possibly spawn more background tasks on each
1057        // iteration.
1058        let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1059        self.0
1060            .concurrent_state_mut()?
1061            .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1062        Ok(handle)
1063    }
1064
1065    /// Run the specified closure `fun` to completion as part of this store's
1066    /// event loop.
1067    ///
1068    /// This will run `fun` as part of this store's event loop until it
1069    /// yields a result.  `fun` is provided an [`Accessor`], which provides
1070    /// controlled access to the store and its data.
1071    ///
1072    /// This function can be used to invoke [`Func::call_concurrent`] for
1073    /// example within the async closure provided here.
1074    ///
1075    /// This function will unconditionally return an error if
1076    /// [`Config::concurrency_support`] is disabled.
1077    ///
1078    /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1079    ///
1080    /// # Store-blocking behavior
1081    ///
1082    /// At this time there are certain situations in which the `Future` returned
1083    /// by the `AsyncFnOnce` passed to this function will not be polled for an
1084    /// extended period of time, despite one or more `Waker::wake` events having
1085    /// occurred for the task to which it belongs.  This can manifest as the
1086    /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1087    /// the `Store` being held by e.g. a blocking host function, preventing the
1088    /// `Future` from being polled. A canonical example of this is when the
1089    /// `fun` provided to this function attempts to set a timeout for an
1090    /// invocation of a wasm function. In this situation the async closure is
1091    /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1092    /// to elapse. At this time this setup will not always work and the timeout
1093    /// may not reliably fire.
1094    ///
1095    /// This function will not block the current thread and as such is always
1096    /// suitable to run in an `async` context, but the current implementation of
1097    /// Wasmtime can lead to situations where a certain wasm computation is
1098    /// required to make progress the closure to make progress. This is an
1099    /// artifact of Wasmtime's historical implementation of `async` functions
1100    /// and is the topic of [#11869] and [#11870]. In the timeout example from
1101    /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1102    /// progress for a readiness notification of (b) to get delivered.
1103    ///
1104    /// This effectively means that it's not possible to reliably perform a
1105    /// "select" operation within the `fun` closure, which timeouts for example
1106    /// are based on. Fixing this requires some relatively major refactoring
1107    /// work within Wasmtime itself. This is a known pitfall otherwise and one
1108    /// that is intended to be fixed one day. In the meantime it's recommended
1109    /// to apply timeouts or such to the entire `run_concurrent` call itself
1110    /// rather than internally.
1111    ///
1112    /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1113    /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1114    ///
1115    /// # Example
1116    ///
1117    /// ```
1118    /// # use {
1119    /// #   wasmtime::{
1120    /// #     error::{Result},
1121    /// #     component::{ Component, Linker, Resource, ResourceTable},
1122    /// #     Config, Engine, Store
1123    /// #   },
1124    /// # };
1125    /// #
1126    /// # struct MyResource(u32);
1127    /// # struct Ctx { table: ResourceTable }
1128    /// #
1129    /// # async fn foo() -> Result<()> {
1130    /// # let mut config = Config::new();
1131    /// # let engine = Engine::new(&config)?;
1132    /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1133    /// # let mut linker = Linker::new(&engine);
1134    /// # let component = Component::new(&engine, "")?;
1135    /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1136    /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1137    /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1138    /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1139    ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1140    ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1141    ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1142    ///    bar.call_concurrent(accessor, (value.0,)).await?;
1143    ///    Ok(())
1144    /// }).await??;
1145    /// # Ok(())
1146    /// # }
1147    /// ```
1148    pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1149    where
1150        T: Send + 'static,
1151    {
1152        ensure!(
1153            self.0.concurrency_support(),
1154            "cannot use `run_concurrent` when Config::concurrency_support disabled",
1155        );
1156        self.do_run_concurrent(fun, false).await
1157    }
1158
1159    pub(super) async fn run_concurrent_trap_on_idle<R>(
1160        self,
1161        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1162    ) -> Result<R>
1163    where
1164        T: Send + 'static,
1165    {
1166        self.do_run_concurrent(fun, true).await
1167    }
1168
1169    async fn do_run_concurrent<R>(
1170        mut self,
1171        fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1172        trap_on_idle: bool,
1173    ) -> Result<R>
1174    where
1175        T: Send + 'static,
1176    {
1177        debug_assert!(self.0.concurrency_support());
1178        check_recursive_run();
1179        let token = StoreToken::new(self.as_context_mut());
1180
1181        struct Dropper<'a, T: 'static, V> {
1182            store: StoreContextMut<'a, T>,
1183            value: ManuallyDrop<V>,
1184        }
1185
1186        impl<'a, T, V> Drop for Dropper<'a, T, V> {
1187            fn drop(&mut self) {
1188                tls::set(self.store.0, || {
1189                    // SAFETY: Here we drop the value without moving it for the
1190                    // first and only time -- per the contract for `Drop::drop`,
1191                    // this code won't run again, and the `value` field will no
1192                    // longer be accessible.
1193                    unsafe { ManuallyDrop::drop(&mut self.value) }
1194                });
1195            }
1196        }
1197
1198        let accessor = &Accessor::new(token);
1199        let dropper = &mut Dropper {
1200            store: self,
1201            value: ManuallyDrop::new(fun(accessor)),
1202        };
1203        // SAFETY: We never move `dropper` nor its `value` field.
1204        let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1205
1206        dropper
1207            .store
1208            .as_context_mut()
1209            .poll_until(future, trap_on_idle)
1210            .await
1211    }
1212
1213    /// Run this store's event loop.
1214    ///
1215    /// The returned future will resolve when the specified future completes or,
1216    /// if `trap_on_idle` is true, when the event loop can't make further
1217    /// progress.
1218    async fn poll_until<R>(
1219        mut self,
1220        mut future: Pin<&mut impl Future<Output = R>>,
1221        trap_on_idle: bool,
1222    ) -> Result<R>
1223    where
1224        T: Send + 'static,
1225    {
1226        struct Reset<'a, T: 'static> {
1227            store: StoreContextMut<'a, T>,
1228            futures: Option<FuturesUnordered<HostTaskFuture>>,
1229        }
1230
1231        impl<'a, T> Drop for Reset<'a, T> {
1232            fn drop(&mut self) {
1233                if let Some(futures) = self.futures.take() {
1234                    *self
1235                        .store
1236                        .0
1237                        .concurrent_state_mut_already_forced_current_thread()
1238                        .futures
1239                        .get_mut() = Some(futures);
1240                }
1241            }
1242        }
1243
1244        loop {
1245            // Take `ConcurrentState::futures` out of the store so we can poll
1246            // it while also safely giving any of the futures inside access to
1247            // `self`.
1248            let futures = self.0.concurrent_state_mut()?.futures.get_mut().take();
1249            let mut reset = Reset {
1250                store: self.as_context_mut(),
1251                futures,
1252            };
1253            let mut next = match reset.futures.as_mut() {
1254                Some(f) => pin!(f.next()),
1255                None => bail_bug!("concurrent state missing futures field"),
1256            };
1257
1258            enum PollResult<R> {
1259                Complete(R),
1260                ProcessWork {
1261                    ready: Vec<WorkItem>,
1262                    low_priority: bool,
1263                },
1264            }
1265
1266            let result = future::poll_fn(|cx| {
1267                // First, poll the future we were passed as an argument and
1268                // return immediately if it's ready.
1269                if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1270                    return Poll::Ready(Ok(PollResult::Complete(value)));
1271                }
1272
1273                // Next, poll `ConcurrentState::futures` (which includes any
1274                // pending host tasks and/or background tasks), returning
1275                // immediately if one of them fails.
1276                let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1277                    Poll::Ready(Some(output)) => {
1278                        match output {
1279                            Err(e) => return Poll::Ready(Err(e)),
1280                            Ok(()) => {}
1281                        }
1282                        Poll::Ready(true)
1283                    }
1284                    Poll::Ready(None) => Poll::Ready(false),
1285                    Poll::Pending => Poll::Pending,
1286                };
1287
1288                // Next, collect the next batch of work items to process, if
1289                // any.  This will be either all of the high-priority work
1290                // items, or if there are none, a single low-priority work item.
1291                let state = reset.store.0.concurrent_state_mut()?;
1292                let mut ready = mem::take(&mut state.high_priority);
1293                let mut low_priority = false;
1294                if ready.is_empty() {
1295                    if let Some(item) = state.low_priority.pop_back() {
1296                        ready.push(item);
1297                        low_priority = true;
1298                    }
1299                }
1300                if !ready.is_empty() {
1301                    return Poll::Ready(Ok(PollResult::ProcessWork {
1302                        ready,
1303                        low_priority,
1304                    }));
1305                }
1306
1307                // Finally, if we have nothing else to do right now, determine what to do
1308                // based on whether there are any pending futures in
1309                // `ConcurrentState::futures`.
1310                return match next {
1311                    Poll::Ready(true) => {
1312                        // In this case, one of the futures in
1313                        // `ConcurrentState::futures` completed
1314                        // successfully, so we return now and continue
1315                        // the outer loop in case there is another one
1316                        // ready to complete.
1317                        Poll::Ready(Ok(PollResult::ProcessWork {
1318                            ready: Vec::new(),
1319                            low_priority: false,
1320                        }))
1321                    }
1322                    Poll::Ready(false) => {
1323                        // Poll the future we were passed one last time
1324                        // in case one of `ConcurrentState::futures` had
1325                        // the side effect of unblocking it.
1326                        if let Poll::Ready(value) =
1327                            tls::set(reset.store.0, || future.as_mut().poll(cx))
1328                        {
1329                            Poll::Ready(Ok(PollResult::Complete(value)))
1330                        } else {
1331                            // In this case, there are no more pending
1332                            // futures in `ConcurrentState::futures`,
1333                            // there are no remaining work items, _and_
1334                            // the future we were passed as an argument
1335                            // still hasn't completed.
1336                            if trap_on_idle {
1337                                // `trap_on_idle` is true, so we exit
1338                                // immediately.
1339                                Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1340                            } else {
1341                                // `trap_on_idle` is false, so we assume
1342                                // that future will wake up and give us
1343                                // more work to do when it's ready to.
1344                                Poll::Pending
1345                            }
1346                        }
1347                    }
1348                    // There is at least one pending future in
1349                    // `ConcurrentState::futures` and we have nothing
1350                    // else to do but wait for now, so we return
1351                    // `Pending`.
1352                    Poll::Pending => Poll::Pending,
1353                };
1354            })
1355            .await;
1356
1357            // Put the `ConcurrentState::futures` back into the store before we
1358            // return or handle any work items since one or more of those items
1359            // might append more futures.
1360            drop(reset);
1361
1362            match result? {
1363                // The future we were passed as an argument completed, so we
1364                // return the result.
1365                PollResult::Complete(value) => break Ok(value),
1366                // The future we were passed has not yet completed, so handle
1367                // any work items and then loop again.
1368                PollResult::ProcessWork {
1369                    ready,
1370                    low_priority,
1371                } => {
1372                    struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1373                        store: StoreContextMut<'a, T>,
1374                        ready: I,
1375                    }
1376
1377                    impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1378                        fn drop(&mut self) {
1379                            while let Some(item) = self.ready.next() {
1380                                match item {
1381                                    WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1382                                    WorkItem::PushFuture(future) => {
1383                                        tls::set(self.store.0, move || drop(future))
1384                                    }
1385                                    _ => {}
1386                                }
1387                            }
1388                        }
1389                    }
1390
1391                    let mut dispose = Dispose {
1392                        store: self.as_context_mut(),
1393                        ready: ready.into_iter(),
1394                    };
1395
1396                    // If we're about to run a low-priority task, first yield to
1397                    // the executor.  This ensures that it won't be starved of
1398                    // the ability to e.g. update the readiness of sockets,
1399                    // etc. which the guest may be using `thread.yield` along
1400                    // with `waitable-set.poll` to monitor in a CPU-heavy loop.
1401                    //
1402                    // This works for e.g. `thread.yield` and callbacks which
1403                    // return `CALLBACK_CODE_YIELD` because we queue a low
1404                    // priority item to resume the task (i.e. resume the thread
1405                    // or call the callback, respectively) just prior to
1406                    // suspending it.  Indeed, as of this writing those are the
1407                    // _only_ situations we queue low-priority tasks.
1408                    // Therefore, we interpret the guest's request to yield as
1409                    // meaning "yield to other guest tasks _and_/_or_ host
1410                    // operations such as updating socket readiness", the latter
1411                    // being the async runtime's responsibility.
1412                    //
1413                    // In the future, if this ends up causing measurable
1414                    // performance issues, this could be optimized such that we
1415                    // only yield periodically (e.g. for batches of low priority
1416                    // items) and not for each and every idividual item.
1417                    if low_priority {
1418                        dispose.store.0.yield_now().await
1419                    }
1420
1421                    while let Some(item) = dispose.ready.next() {
1422                        dispose
1423                            .store
1424                            .as_context_mut()
1425                            .handle_work_item(item)
1426                            .await?;
1427                    }
1428                }
1429            }
1430        }
1431    }
1432
1433    /// Handle the specified work item, possibly resuming a fiber if applicable.
1434    async fn handle_work_item(self, item: WorkItem) -> Result<()>
1435    where
1436        T: Send,
1437    {
1438        log::trace!("handle work item {item:?}");
1439        match item {
1440            WorkItem::PushFuture(future) => {
1441                self.0
1442                    .concurrent_state_mut()?
1443                    .futures_mut()?
1444                    .push(future.into_inner());
1445            }
1446            WorkItem::ResumeFiber(fiber) => {
1447                self.0.resume_fiber(fiber).await?;
1448            }
1449            WorkItem::ResumeThread(_, thread) => {
1450                if let GuestThreadState::Ready { fiber, .. } = mem::replace(
1451                    &mut self.0.concurrent_state_mut()?.get_mut(thread.thread)?.state,
1452                    GuestThreadState::Running,
1453                ) {
1454                    self.0.resume_fiber(fiber).await?;
1455                } else {
1456                    bail_bug!("cannot resume non-pending thread {thread:?}");
1457                }
1458            }
1459            WorkItem::GuestCall(_, call) => {
1460                if call.is_ready(self.0)? {
1461                    self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1462                } else {
1463                    let state = self.0.concurrent_state_mut()?;
1464                    let task = state.get_mut(call.thread.task)?;
1465                    if !task.starting_sent {
1466                        task.starting_sent = true;
1467                        if let GuestCallKind::StartImplicit(_) = &call.kind {
1468                            Waitable::Guest(call.thread.task).set_event(
1469                                state,
1470                                Some(Event::Subtask {
1471                                    status: Status::Starting,
1472                                }),
1473                            )?;
1474                        }
1475                    }
1476
1477                    let instance = state.get_mut(call.thread.task)?.instance;
1478                    self.0
1479                        .instance_state(instance)
1480                        .concurrent_state()
1481                        .pending
1482                        .insert(call.thread, call.kind);
1483                }
1484            }
1485            WorkItem::WorkerFunction(fun) => {
1486                self.run_on_worker(WorkerItem::Function(fun)).await?;
1487            }
1488        }
1489
1490        Ok(())
1491    }
1492
1493    /// Execute the specified guest call on a worker fiber.
1494    async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1495    where
1496        T: Send,
1497    {
1498        let worker = if let Some(fiber) = self.0.concurrent_state_mut()?.worker.take() {
1499            fiber
1500        } else {
1501            fiber::make_fiber(self.0, move |store| {
1502                loop {
1503                    let Some(item) = store.concurrent_state_mut()?.worker_item.take() else {
1504                        bail_bug!("worker_item not present when resuming fiber")
1505                    };
1506                    match item {
1507                        WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1508                        WorkerItem::Function(fun) => fun.into_inner()(store)?,
1509                    }
1510
1511                    store.suspend(SuspendReason::NeedWork)?;
1512                }
1513            })?
1514        };
1515
1516        let worker_item = &mut self.0.concurrent_state_mut()?.worker_item;
1517        assert!(worker_item.is_none());
1518        *worker_item = Some(item);
1519
1520        self.0.resume_fiber(worker).await
1521    }
1522
1523    /// Wrap the specified host function in a future which will call it, passing
1524    /// it an `&Accessor<T>`.
1525    ///
1526    /// See the `Accessor` documentation for details.
1527    pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1528    where
1529        T: 'static,
1530        F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1531            + Send
1532            + Sync
1533            + 'static,
1534        R: Send + Sync + 'static,
1535    {
1536        let token = StoreToken::new(self);
1537        async move {
1538            let mut accessor = Accessor::new(token);
1539            closure(&mut accessor).await
1540        }
1541    }
1542
1543    /// Returns an iterator over the current async call stack defined by the
1544    /// component model.
1545    ///
1546    /// This can be used, for example to correlate a host import call with which
1547    /// root export task originally called it.
1548    ///
1549    /// Tasks are yielded "youngest first" where the first item in the iterator
1550    /// is the current task, and the last item in the iterator is the original
1551    /// call.
1552    pub fn async_call_stack(&mut self) -> Result<impl Iterator<Item = GuestTaskId>> {
1553        let mut cur = Some(self.0.current_thread()?);
1554        let state = self.0.concurrent_state_mut()?;
1555        Ok(core::iter::from_fn(move || {
1556            while let Some(t) = cur {
1557                cur = state.parent(t);
1558                if let Some(thread) = t.guest() {
1559                    return Some(GuestTaskId(thread.task));
1560                }
1561            }
1562
1563            None
1564        }))
1565    }
1566}
1567
1568impl StoreOpaque {
1569    /// Returns the currently-running thread, promoting any deferred lazy thread
1570    /// into a fully-materialized `CurrentThread`.
1571    pub(crate) fn current_thread(&mut self) -> Result<CurrentThread> {
1572        // Without concurrency support there is nothing to force.
1573        if !self.concurrency_support() {
1574            return Ok(CurrentThread::None);
1575        }
1576
1577        // If the JIT-visible current thread isn't a deferred thread then
1578        // `ConcurrentState` is already up to date.
1579        if !self
1580            .vm_store_context_mut()
1581            .current_thread_mut()
1582            .is_deferred()
1583        {
1584            return Ok(self
1585                .concurrent_state_mut_already_forced_current_thread()
1586                .unforced_current_thread);
1587        }
1588
1589        // The component instance whose adapters pushed the deferred frames; all
1590        // frames in a guest-to-guest, sync-to-sync call chain of fused adapters
1591        // live within a single `wasmtime::component::Instance` (because
1592        // cross-`wasmtime::component::Instance` calls don't go through fused
1593        // adapters), and guest code only ever runs as a guest thread, so the
1594        // chain's base thread is already materialized in `ConcurrentState` and
1595        // we can get the `ComponentInstanceId` shared by the whole chain from
1596        // here.
1597        let state = self.concurrent_state_mut_without_forcing_current_thread();
1598        let id = match state.unforced_current_thread.guest().copied() {
1599            Some(thread) => state.get_mut(thread.task)?.instance.instance,
1600            None => bail_bug!("deferred component-model thread with non-guest base"),
1601        };
1602
1603        // Collect the deferred frames pushed inline by fused adapters, walking
1604        // the `parent` chain from innermost to the base.
1605        let mut frames = Vec::new();
1606        let mut cur = *self.vm_store_context_mut().current_thread_mut();
1607        while let Some(ptr) = cur.as_deferred() {
1608            // SAFETY: `ptr` points at a `VMDeferredThread` living in a fused
1609            // adapter's stack frame that is suspended below us on the stack
1610            // (mid-call, waiting for this nested call to return), so the
1611            // referent is still valid and exclusively ours to read.
1612            let deferred = unsafe { ptr.as_non_null().as_ref() };
1613            frames.push((
1614                deferred.callee_async != 0,
1615                deferred.callee_instance,
1616                deferred.saved_context,
1617            ));
1618            cur = deferred.parent;
1619        }
1620
1621        // Mark the current thread forced *before* replaying so that any
1622        // reentrant `force_current_thread` call short-circuits via the
1623        // non-deferred path above.
1624        *self.vm_store_context_mut().current_thread_mut() = VMLazyThread::forced();
1625
1626        // Save the current context, as we need to overwrite it while replaying
1627        // below.
1628        let current_context = *self.vm_store_context_mut().component_context_mut();
1629
1630        // Replay the deferred `enter_guest_sync_call`s outermost-first so that
1631        // the resulting `ConcurrentState` matches what the non-deferred path
1632        // would have otherwise produced.
1633        for (callee_async, callee_instance, saved_context) in frames.into_iter().rev() {
1634            // Restore the caller's context slots so that we save the correct
1635            // values into the caller's thread, exactly as the non-deferred path
1636            // would have on entry.
1637            *self.vm_store_context_mut().component_context_mut() = saved_context;
1638            let callee = RuntimeInstance {
1639                instance: id,
1640                index: RuntimeComponentInstanceIndex::from_u32(callee_instance),
1641            };
1642            self.enter_guest_sync_call(None, callee_async, callee)?;
1643        }
1644
1645        // Replaying done; restore the current context.
1646        *self.vm_store_context_mut().component_context_mut() = current_context;
1647
1648        Ok(self
1649            .concurrent_state_mut_without_forcing_current_thread()
1650            .unforced_current_thread)
1651    }
1652
1653    fn current_guest_thread(&mut self) -> Result<QualifiedThreadId> {
1654        match self.current_thread()?.guest() {
1655            Some(id) => Ok(*id),
1656            None => bail_bug!("current thread is not a guest thread"),
1657        }
1658    }
1659
1660    fn current_host_thread(&mut self) -> Result<TableId<HostTask>> {
1661        match self.current_thread()?.host() {
1662            Some(id) => Ok(id),
1663            None => bail_bug!("current thread is not a host thread"),
1664        }
1665    }
1666
1667    /// Returns whether there's a pending cancellation on the current guest thread,
1668    /// consuming the event if so.
1669    fn take_pending_cancellation(&mut self) -> Result<bool> {
1670        let thread = self.current_guest_thread()?;
1671        let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1672        if let Some(Event::Cancelled) = task.event {
1673            task.event.take();
1674            return Ok(true);
1675        }
1676        Ok(false)
1677    }
1678
1679    /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1680    /// guest-to-guest call or a sync host-to-guest call.
1681    ///
1682    /// This task will only be used for the purpose of handling calls to
1683    /// intrinsic functions; both parameter lowering and result lifting are
1684    /// assumed to be taken care of elsewhere.
1685    ///
1686    /// NB: for sync-to-sync, guest-to-guest calls we delay task construction in
1687    /// fused adapters, see `StoreOpaque::current_thread`, `VMDeferredThread`,
1688    /// and `lower_fact_enter_sync_call`. Make sure all this stuff stays in
1689    /// sync!
1690    pub(crate) fn enter_guest_sync_call(
1691        &mut self,
1692        guest_caller: Option<RuntimeInstance>,
1693        callee_async: bool,
1694        callee: RuntimeInstance,
1695    ) -> Result<()> {
1696        log::trace!("enter sync call {callee:?}");
1697        if !self.concurrency_support() {
1698            return self.enter_call_not_concurrent();
1699        }
1700
1701        let thread = self.current_thread()?;
1702        let state = self.concurrent_state_mut()?;
1703        let instance = if let Some(thread) = thread.guest() {
1704            Some(state.get_mut(thread.task)?.instance)
1705        } else {
1706            None
1707        };
1708        if guest_caller.is_some() {
1709            debug_assert_eq!(instance, guest_caller);
1710        }
1711        let guest_thread = GuestTask::new(
1712            state,
1713            Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1714            LiftResult {
1715                lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1716                ty: TypeTupleIndex::reserved_value(),
1717                memory: None,
1718                string_encoding: StringEncoding::Utf8,
1719            },
1720            if let Some(thread) = thread.guest() {
1721                Caller::Guest { thread: *thread }
1722            } else {
1723                Caller::Host {
1724                    tx: None,
1725                    host_future_present: false,
1726                    caller: thread,
1727                }
1728            },
1729            None,
1730            callee,
1731            callee_async,
1732        )?;
1733
1734        Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1735            guest_thread.thread,
1736            self,
1737            callee.index,
1738        )?;
1739        self.set_thread(guest_thread)?;
1740
1741        Ok(())
1742    }
1743
1744    /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1745    ///
1746    /// NB: for sync-to-sync, guest-to-guest calls we delay task construction in
1747    /// fused adapters and then when the call returns we check to see if the
1748    /// task's contruction was forced and if not avoid calling out of the JIT
1749    /// code to this function. See `lower_fact_exit_sync_call`. Make sure all
1750    /// this stuff stays in sync!
1751    pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1752        if !self.concurrency_support() {
1753            return Ok(self.exit_call_not_concurrent());
1754        }
1755        let thread = match self.set_thread(CurrentThread::None)?.guest() {
1756            Some(t) => *t,
1757            None => bail_bug!("expected task when exiting"),
1758        };
1759        let task = self.concurrent_state_mut()?.get_mut(thread.task)?;
1760        let instance = task.instance;
1761        let caller = match &task.caller {
1762            &Caller::Guest { thread } => thread.into(),
1763            &Caller::Host { caller, .. } => caller,
1764        };
1765        task.lift_result = None;
1766        task.exited = true;
1767        self.set_thread(caller)?;
1768
1769        log::trace!("exit sync call {instance:?}");
1770        self.cleanup_thread(thread, instance, CleanupTask::Yes)?;
1771
1772        Ok(())
1773    }
1774
1775    /// Similar to `enter_guest_sync_call` except for when the guest makes a
1776    /// transition to the host.
1777    ///
1778    /// FIXME: this is called for all guest->host transitions and performs some
1779    /// relatively expensive table manipulations. This would ideally be
1780    /// optimized to avoid the full allocation of a `HostTask` in at least some
1781    /// situations.
1782    pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1783        if !self.concurrency_support() {
1784            self.enter_call_not_concurrent()?;
1785            return Ok(None);
1786        }
1787        let caller = self.current_guest_thread()?;
1788        let state = self.concurrent_state_mut()?;
1789        let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1790        log::trace!("new host task {task:?}");
1791        self.set_thread(task)?;
1792        Ok(Some(task))
1793    }
1794
1795    /// Invoked before lowering the results of a host task to the guest.
1796    ///
1797    /// This is used to update the current thread annotations within the store
1798    /// to ensure that it reflects the guest task, not the host task, since
1799    /// lowering may execute guest code.
1800    pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1801        if !self.concurrency_support() {
1802            return Ok(());
1803        }
1804        let task = self.current_host_thread()?;
1805        let caller = self.concurrent_state_mut()?.get_mut(task)?.caller;
1806        self.set_thread(caller)?;
1807        Ok(())
1808    }
1809
1810    /// Dual of `host_task_create` and signifies that the host has finished and
1811    /// will be cleaned up.
1812    ///
1813    /// Note that this isn't invoked when the host is invoked asynchronously and
1814    /// the host isn't complete yet. In that situation the host task persists
1815    /// and will be cleaned up separately in `subtask_drop`
1816    pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1817        match task {
1818            Some(task) => {
1819                log::trace!("delete host task {task:?}");
1820                self.concurrent_state_mut()?.delete(task)?;
1821            }
1822            None => {
1823                self.exit_call_not_concurrent();
1824            }
1825        }
1826        Ok(())
1827    }
1828
1829    /// Determine whether the specified instance may be entered from the host.
1830    ///
1831    /// We return `true` here only if all of the following hold:
1832    ///
1833    /// - The top-level instance is not already on the current task's call stack.
1834    /// - The instance is not in need of a post-return function call.
1835    /// - `self` has not been poisoned due to a trap.
1836    pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1837        if self.trapped() {
1838            return Ok(false);
1839        }
1840        if !self.concurrency_support() {
1841            return Ok(true);
1842        }
1843        let mut cur = Some(self.current_thread()?);
1844        let state = self.concurrent_state_mut()?;
1845        while let Some(t) = cur {
1846            if let Some(thread) = t.guest() {
1847                let task = state.get_mut(thread.task)?;
1848                // Note that we only compare top-level instance IDs here.
1849                // The idea is that the host is not allowed to recursively
1850                // enter a top-level instance even if the specific leaf
1851                // instance is not on the stack. This the behavior defined
1852                // in the spec, and it allows us to elide runtime checks in
1853                // guest-to-guest adapters.
1854                if task.instance.instance == instance.instance {
1855                    return Ok(false);
1856                }
1857            }
1858            cur = state.parent(t);
1859        }
1860        Ok(true)
1861    }
1862
1863    /// Helper function to retrieve the `InstanceState` for the
1864    /// specified instance.
1865    fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1866        self.component_instance_mut(instance.instance)
1867            .instance_state(instance.index)
1868    }
1869
1870    /// Configure the currently running `thread`.
1871    ///
1872    /// This will save off any state necessary for the previous thread, if
1873    /// applicable, and then it'll additionally update state for `thread` if
1874    /// needed too.
1875    fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1876        let thread = thread.into();
1877        let state = self.concurrent_state_mut()?;
1878        let old_thread = mem::replace(&mut state.unforced_current_thread, thread);
1879
1880        // First thing to do after swapping threads is updating the context
1881        // slots for this thread within the store. This restores the behavior of
1882        // `context.{get,set}`. This involves taking the old state out of the
1883        // store, saving it in the thread that's being swapped from, and doing
1884        // the inverse for the new thread. When debug assertions are enabled
1885        // this also leaves behind sentinel values to try to uncover bugs where
1886        // this may be forgotten.
1887        if let Some(old_thread) = old_thread.guest() {
1888            let old_context = *self.vm_store_context_mut().component_context_mut();
1889            self.concurrent_state_mut()?
1890                .get_mut(old_thread.thread)?
1891                .context = old_context;
1892        }
1893        if cfg!(debug_assertions) {
1894            *self.vm_store_context_mut().component_context_mut() =
1895                [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1896        }
1897        if let Some(thread) = thread.guest() {
1898            let thread = self.concurrent_state_mut()?.get_mut(thread.thread)?;
1899            let context = thread.context;
1900            if cfg!(debug_assertions) {
1901                thread.context = [u32::MAX; NUM_COMPONENT_CONTEXT_SLOTS];
1902            }
1903            *self.vm_store_context_mut().component_context_mut() = context;
1904        }
1905
1906        // Each time we switch threads, we conservatively set `task_may_block`
1907        // to `false` for the component instance we're switching away from (if
1908        // any), meaning it will be `false` for any new thread created for that
1909        // instance unless explicitly set otherwise.
1910        //
1911        // Additionally if we're switching to a new thread, set its component
1912        // instance's `task_may_block` according to where it left off.
1913        let state = self.concurrent_state_mut()?;
1914        if let Some(old_thread) = old_thread.guest() {
1915            let instance = state.get_mut(old_thread.task)?.instance.instance;
1916            self.component_instance_mut(instance)
1917                .set_task_may_block(false)
1918        }
1919
1920        if thread.guest().is_some() {
1921            self.set_task_may_block()?;
1922        }
1923
1924        // Keep the JIT-visible current-thread pointer in sync.
1925        *self.vm_store_context_mut().current_thread_mut() = if thread.is_none() {
1926            VMLazyThread::none()
1927        } else {
1928            VMLazyThread::forced()
1929        };
1930
1931        Ok(old_thread)
1932    }
1933
1934    /// Set the global variable representing whether the current task may block
1935    /// prior to entering Wasm code.
1936    fn set_task_may_block(&mut self) -> Result<()> {
1937        let guest_thread = self.current_guest_thread()?;
1938        let state = self.concurrent_state_mut()?;
1939        let instance = state.get_mut(guest_thread.task)?.instance.instance;
1940        let may_block = self.concurrent_state_mut()?.may_block(guest_thread.task)?;
1941        self.component_instance_mut(instance)
1942            .set_task_may_block(may_block);
1943        Ok(())
1944    }
1945
1946    pub(crate) fn check_blocking(&mut self) -> Result<()> {
1947        if !self.concurrency_support() {
1948            return Ok(());
1949        }
1950        let task = self.current_guest_thread()?.task;
1951        let state = self.concurrent_state_mut()?;
1952        let instance = state.get_mut(task)?.instance.instance;
1953        let task_may_block = self.component_instance(instance).get_task_may_block();
1954
1955        if task_may_block {
1956            Ok(())
1957        } else {
1958            Err(Trap::CannotBlockSyncTask.into())
1959        }
1960    }
1961
1962    /// Record that we're about to enter a (sub-)component instance which does
1963    /// not support more than one concurrent, stackful activation, meaning it
1964    /// cannot be entered again until the next call returns.
1965    fn enter_instance(&mut self, instance: RuntimeInstance) {
1966        log::trace!("enter {instance:?}");
1967        self.instance_state(instance)
1968            .concurrent_state()
1969            .do_not_enter = true;
1970    }
1971
1972    /// Record that we've exited a (sub-)component instance previously entered
1973    /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1974    /// See the documentation for the latter for details.
1975    fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1976        log::trace!("exit {instance:?}");
1977        self.instance_state(instance)
1978            .concurrent_state()
1979            .do_not_enter = false;
1980        self.partition_pending(instance)
1981    }
1982
1983    /// Iterate over `InstanceState::pending`, moving any ready items into the
1984    /// "high priority" work item queue.
1985    ///
1986    /// See `GuestCall::is_ready` for details.
1987    fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1988        for (thread, kind) in
1989            mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1990        {
1991            let call = GuestCall { thread, kind };
1992            if call.is_ready(self)? {
1993                self.concurrent_state_mut()?
1994                    .push_high_priority(WorkItem::GuestCall(instance.index, call));
1995            } else {
1996                self.instance_state(instance)
1997                    .concurrent_state()
1998                    .pending
1999                    .insert(call.thread, call.kind);
2000            }
2001        }
2002
2003        Ok(())
2004    }
2005
2006    /// Implements the `backpressure.{inc,dec}` intrinsics.
2007    pub(crate) fn backpressure_modify(
2008        &mut self,
2009        caller_instance: RuntimeInstance,
2010        modify: impl FnOnce(u16) -> Option<u16>,
2011    ) -> Result<()> {
2012        let state = self.instance_state(caller_instance).concurrent_state();
2013        let old = state.backpressure;
2014        let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
2015        state.backpressure = new;
2016
2017        if old > 0 && new == 0 {
2018            // Backpressure was previously enabled and is now disabled; move any
2019            // newly-eligible guest calls to the "high priority" queue.
2020            self.partition_pending(caller_instance)?;
2021        }
2022
2023        Ok(())
2024    }
2025
2026    /// Resume the specified fiber, giving it exclusive access to the specified
2027    /// store.
2028    async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
2029        let old_thread = self.current_thread()?;
2030        log::trace!("resume_fiber: save current thread {old_thread:?}");
2031
2032        let fiber = fiber::resolve_or_release(self, fiber).await?;
2033
2034        self.set_thread(old_thread)?;
2035
2036        let state = self.concurrent_state_mut()?;
2037
2038        if let Some(ot) = old_thread.guest() {
2039            state.get_mut(ot.thread)?.state = GuestThreadState::Running;
2040        }
2041        log::trace!("resume_fiber: restore current thread {old_thread:?}");
2042
2043        if let Some(mut fiber) = fiber {
2044            log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
2045            // See the `SuspendReason` documentation for what each case means.
2046            let reason = match state.suspend_reason.take() {
2047                Some(r) => r,
2048                None => bail_bug!("suspend reason missing when resuming fiber"),
2049            };
2050            match reason {
2051                SuspendReason::NeedWork => {
2052                    if state.worker.is_none() {
2053                        state.worker = Some(fiber);
2054                    } else {
2055                        fiber.dispose(self);
2056                    }
2057                }
2058                SuspendReason::Yielding {
2059                    thread,
2060                    cancellable,
2061                    ..
2062                } => {
2063                    state.get_mut(thread.thread)?.state =
2064                        GuestThreadState::Ready { fiber, cancellable };
2065                    let instance = state.get_mut(thread.task)?.instance.index;
2066                    state.push_low_priority(WorkItem::ResumeThread(instance, thread));
2067                }
2068                SuspendReason::ExplicitlySuspending { thread, .. } => {
2069                    state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
2070                }
2071                SuspendReason::Waiting { set, thread, .. } => {
2072                    let old = state
2073                        .get_mut(set)?
2074                        .waiting
2075                        .insert(thread, WaitMode::Fiber(fiber));
2076                    assert!(old.is_none());
2077                }
2078            };
2079        } else {
2080            log::trace!("resume_fiber: fiber has exited");
2081        }
2082
2083        Ok(())
2084    }
2085
2086    /// Suspend the current fiber, storing the reason in
2087    /// `ConcurrentState::suspend_reason` to indicate the conditions under which
2088    /// it should be resumed.
2089    ///
2090    /// See the `SuspendReason` documentation for details.
2091    fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
2092        log::trace!("suspend fiber: {reason:?}");
2093
2094        // If we're yielding or waiting on behalf of a guest thread, we'll need to
2095        // pop the call context which manages resource borrows before suspending
2096        // and then push it again once we've resumed.
2097        let task = match &reason {
2098            SuspendReason::Yielding { thread, .. }
2099            | SuspendReason::Waiting { thread, .. }
2100            | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
2101            SuspendReason::NeedWork => None,
2102        };
2103
2104        let old_guest_thread = if task.is_some() {
2105            self.current_thread()?
2106        } else {
2107            CurrentThread::None
2108        };
2109
2110        // We should not have reached here unless either there's no current
2111        // task, or the current task is permitted to block.  In addition, we
2112        // special-case `thread.switch-to` and waiting for a subtask to go from
2113        // `starting` to `started`, both of which we consider non-blocking
2114        // operations despite requiring a suspend.
2115        debug_assert!(
2116            matches!(
2117                reason,
2118                SuspendReason::ExplicitlySuspending {
2119                    skip_may_block_check: true,
2120                    ..
2121                } | SuspendReason::Waiting {
2122                    skip_may_block_check: true,
2123                    ..
2124                } | SuspendReason::Yielding {
2125                    skip_may_block_check: true,
2126                    ..
2127                }
2128            ) || old_guest_thread
2129                .guest()
2130                .map(|thread| self.concurrent_state_mut()?.may_block(thread.task))
2131                .transpose()?
2132                .unwrap_or(true)
2133        );
2134
2135        let suspend_reason = &mut self.concurrent_state_mut()?.suspend_reason;
2136        assert!(suspend_reason.is_none());
2137        *suspend_reason = Some(reason);
2138
2139        self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
2140
2141        if task.is_some() {
2142            self.set_thread(old_guest_thread)?;
2143        }
2144
2145        Ok(())
2146    }
2147
2148    fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
2149        let caller = self.current_guest_thread()?;
2150        let state = self.concurrent_state_mut()?;
2151
2152        if waitable.common(state)?.set.is_some() {
2153            bail!(Trap::WaitableSyncAndAsync);
2154        }
2155
2156        let set = state.get_mut(caller.thread)?.sync_call_set;
2157        waitable.join(state, Some(set))?;
2158        self.suspend(SuspendReason::Waiting {
2159            set,
2160            thread: caller,
2161            skip_may_block_check: false,
2162        })?;
2163        let state = self.concurrent_state_mut()?;
2164        waitable.join(state, None)
2165    }
2166
2167    /// Cleans up the data structures backing the `guest_thread` specified,
2168    /// removing it from the internal tables of `runtime_instance` as well.
2169    ///
2170    /// This function is used whenever a guest thread has fully exited and
2171    /// completed. This'll clean up the associated `GuestThread` structure and
2172    /// related resources it contains.
2173    ///
2174    /// Other functionality that this implements is:
2175    ///
2176    /// * This will perform conditional cleanup of the `GuestTask` that owns
2177    ///   this thread if `cleanup_task` is `CleanupTask::Yes`.
2178    /// * If there are no more threads in the `GuestTask` that this thread is
2179    ///   associated with, and if the task hasn't produced a result (e.g. it's not
2180    ///   returned or cancelled), then a trap will be raised that a result
2181    ///   wasn't ever produced.
2182    /// * If this task is finished, meaning the top-level thread exited and
2183    ///   additionally it's been returned or cancelled, then this will handle
2184    ///   management of the store's "active interesting tasks" counter.
2185    ///
2186    /// Effectively this is intended to be a "narrow waist" through which many
2187    /// destruction operations are funneled through.
2188    fn cleanup_thread(
2189        &mut self,
2190        guest_thread: QualifiedThreadId,
2191        runtime_instance: RuntimeInstance,
2192        cleanup_task: CleanupTask,
2193    ) -> Result<()> {
2194        let state = self.concurrent_state_mut()?;
2195        let thread_data = state.get_mut(guest_thread.thread)?;
2196        let sync_call_set = thread_data.sync_call_set;
2197        if let Some(guest_id) = thread_data.instance_rep {
2198            self.instance_state(runtime_instance)
2199                .thread_handle_table()
2200                .guest_thread_remove(guest_id)?;
2201        }
2202        let state = self.concurrent_state_mut()?;
2203
2204        // Clean up any pending subtasks in the sync_call_set
2205        for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2206            if let Some(Event::Subtask {
2207                status: Status::Returned | Status::ReturnCancelled,
2208            }) = waitable.common(state)?.event
2209            {
2210                waitable.delete_from(state)?;
2211            }
2212        }
2213
2214        state.delete(guest_thread.thread)?;
2215        state.delete(sync_call_set)?;
2216        let task = state.get_mut(guest_thread.task)?;
2217        task.threads.remove(&guest_thread.thread);
2218
2219        if task.threads.is_empty() && !task.returned_or_cancelled() {
2220            bail!(Trap::NoAsyncResult);
2221        }
2222        let ready_to_delete = task.ready_to_delete();
2223
2224        if !task.decremented_interesting_task_count && task.exited && task.returned_or_cancelled() {
2225            task.decremented_interesting_task_count = true;
2226
2227            debug_assert!(state.interesting_tasks > 0);
2228            state.interesting_tasks -= 1;
2229            if state.interesting_tasks == 0
2230                && let Some(waker) = state.interesting_tasks_empty_waker.take()
2231            {
2232                waker.wake();
2233            }
2234        }
2235
2236        match cleanup_task {
2237            CleanupTask::Yes => {
2238                if ready_to_delete {
2239                    Waitable::Guest(guest_thread.task).delete_from(state)?;
2240                }
2241            }
2242            CleanupTask::No => {}
2243        }
2244
2245        Ok(())
2246    }
2247
2248    /// Performs cancellation of the `guest_task` specified with the
2249    /// precondition that the task hasn't lowered its parameters.
2250    ///
2251    /// In this situation the task hasn't ever been started meaning it hasn't
2252    /// actually run any wasm code yet. This requires cleaning up metadata such
2253    /// as thread information attached to the task.
2254    ///
2255    /// The main two entrypoints for this function are:
2256    ///
2257    /// * Task cancellation via `subtask.cancel`, the intrinsic.
2258    /// * Dropping a host `call_async` future which needs to cancel the task
2259    ///   because it cannot reference its parameters any more.
2260    fn cancel_guest_subtask_without_lowered_parameters(
2261        &mut self,
2262        caller_instance: RuntimeInstance,
2263        guest_task: TableId<GuestTask>,
2264    ) -> Result<()> {
2265        let concurrent_state = self.concurrent_state_mut()?;
2266        let task = concurrent_state.get_mut(guest_task)?;
2267        assert!(!task.already_lowered_parameters());
2268        // The task is in a `starting` state, meaning it hasn't run at
2269        // all yet.  Here we update its fields to indicate that it is
2270        // ready to delete immediately once `subtask.drop` is called.
2271        task.lower_params = None;
2272        task.lift_result = None;
2273        task.exited = true;
2274        let instance = task.instance;
2275
2276        // Clean up the thread within this task as it's now never going
2277        // to run.
2278        assert_eq!(1, task.threads.len());
2279        let thread = *task.threads.iter().next().unwrap();
2280        self.cleanup_thread(
2281            QualifiedThreadId {
2282                task: guest_task,
2283                thread,
2284            },
2285            caller_instance,
2286            CleanupTask::No,
2287        )?;
2288
2289        // Not yet started; cancel and remove from pending
2290        let pending = &mut self.instance_state(instance).concurrent_state().pending;
2291        let pending_count = pending.len();
2292        pending.retain(|thread, _| thread.task != guest_task);
2293        // If there were no pending threads for this task, we're in an error state
2294        if pending.len() == pending_count {
2295            bail!(Trap::SubtaskCancelAfterTerminal);
2296        }
2297        Ok(())
2298    }
2299
2300    /// Used by `ResourceTables` to record the scope of a borrow to get undone
2301    /// in the future.
2302    pub(crate) fn current_scope_id(&mut self) -> Result<Option<u32>> {
2303        if !self.concurrency_support() {
2304            return self.current_scope_id_not_concurrent();
2305        }
2306        let (bits, is_host) = match self.current_thread()? {
2307            CurrentThread::Guest(id) => (id.task.rep(), false),
2308            CurrentThread::Host(id) => (id.rep(), true),
2309            CurrentThread::None => return Ok(None),
2310        };
2311        assert_eq!((bits << 1) >> 1, bits);
2312        Ok(Some((bits << 1) | u32::from(is_host)))
2313    }
2314}
2315
2316enum CleanupTask {
2317    Yes,
2318    No,
2319}
2320
2321impl Instance {
2322    /// Get the next pending event for the specified task and (optional)
2323    /// waitable set, along with the waitable handle if applicable.
2324    fn get_event(
2325        self,
2326        store: &mut StoreOpaque,
2327        guest_task: TableId<GuestTask>,
2328        set: Option<TableId<WaitableSet>>,
2329        cancellable: bool,
2330    ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
2331        let state = store.concurrent_state_mut()?;
2332
2333        let event = &mut state.get_mut(guest_task)?.event;
2334        if let Some(ev) = event
2335            && (cancellable || !matches!(ev, Event::Cancelled))
2336        {
2337            log::trace!("deliver event {ev:?} to {guest_task:?}");
2338            let ev = *ev;
2339            *event = None;
2340            return Ok(Some((ev, None)));
2341        }
2342
2343        let set = match set {
2344            Some(set) => set,
2345            None => return Ok(None),
2346        };
2347        let waitable = match state.get_mut(set)?.ready.pop_first() {
2348            Some(v) => v,
2349            None => return Ok(None),
2350        };
2351
2352        let common = waitable.common(state)?;
2353        let handle = match common.handle {
2354            Some(h) => h,
2355            None => bail_bug!("handle not set when delivering event"),
2356        };
2357        let event = match common.event.take() {
2358            Some(e) => e,
2359            None => bail_bug!("event not set when delivering event"),
2360        };
2361
2362        log::trace!(
2363            "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
2364        );
2365
2366        waitable.on_delivery(store, self, event)?;
2367
2368        Ok(Some((event, Some((waitable, handle)))))
2369    }
2370
2371    /// Handle the `CallbackCode` returned from an async-lifted export or its
2372    /// callback.
2373    ///
2374    /// If this returns `Ok(Some(call))`, then `call` should be run immediately
2375    /// using `handle_guest_call`.
2376    fn handle_callback_code(
2377        self,
2378        store: &mut StoreOpaque,
2379        guest_thread: QualifiedThreadId,
2380        runtime_instance: RuntimeComponentInstanceIndex,
2381        code: u32,
2382    ) -> Result<Option<GuestCall>> {
2383        let (code, set) = unpack_callback_code(code);
2384
2385        log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
2386
2387        let state = store.concurrent_state_mut()?;
2388
2389        let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
2390            let set = store
2391                .instance_state(self.runtime_instance(runtime_instance))
2392                .handle_table()
2393                .waitable_set_rep(handle)?;
2394
2395            Ok(TableId::<WaitableSet>::new(set))
2396        };
2397
2398        Ok(match code {
2399            callback_code::EXIT => {
2400                log::trace!("implicit thread {guest_thread:?} completed");
2401                let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2402                task.exited = true;
2403                task.callback = None;
2404                store.cleanup_thread(
2405                    guest_thread,
2406                    self.runtime_instance(runtime_instance),
2407                    CleanupTask::Yes,
2408                )?;
2409                None
2410            }
2411            callback_code::YIELD => {
2412                let task = state.get_mut(guest_thread.task)?;
2413                // If an `Event::Cancelled` is pending, we'll deliver that;
2414                // otherwise, we'll deliver `Event::None`.  Note that
2415                // `GuestTask::event` is only ever set to one of those two
2416                // `Event` variants.
2417                if let Some(event) = task.event {
2418                    assert!(matches!(event, Event::None | Event::Cancelled));
2419                } else {
2420                    task.event = Some(Event::None);
2421                }
2422                let call = GuestCall {
2423                    thread: guest_thread,
2424                    kind: GuestCallKind::DeliverEvent {
2425                        instance: self,
2426                        set: None,
2427                    },
2428                };
2429                if state.may_block(guest_thread.task)? {
2430                    // Push this thread onto the "low priority" queue so it runs
2431                    // after any other threads have had a chance to run.
2432                    state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2433                    None
2434                } else {
2435                    // Yielding in a non-blocking context is defined as a no-op
2436                    // according to the spec, so we must run this thread
2437                    // immediately without allowing any others to run.
2438                    Some(call)
2439                }
2440            }
2441            callback_code::WAIT => {
2442                // The task may only return `WAIT` if it was created for a call
2443                // to an async export).  Otherwise, we'll trap.
2444                state.check_blocking_for(guest_thread.task)?;
2445
2446                let set = get_set(store, set)?;
2447                let state = store.concurrent_state_mut()?;
2448
2449                if state.get_mut(guest_thread.task)?.event.is_some()
2450                    || !state.get_mut(set)?.ready.is_empty()
2451                {
2452                    // An event is immediately available; deliver it ASAP.
2453                    state.push_high_priority(WorkItem::GuestCall(
2454                        runtime_instance,
2455                        GuestCall {
2456                            thread: guest_thread,
2457                            kind: GuestCallKind::DeliverEvent {
2458                                instance: self,
2459                                set: Some(set),
2460                            },
2461                        },
2462                    ));
2463                } else {
2464                    // No event is immediately available.
2465                    //
2466                    // We're waiting, so register to be woken up when an event
2467                    // is published for this waitable set.
2468                    //
2469                    // Here we also set `GuestTask::wake_on_cancel` which allows
2470                    // `subtask.cancel` to interrupt the wait.
2471                    let old = state
2472                        .get_mut(guest_thread.thread)?
2473                        .wake_on_cancel
2474                        .replace(set);
2475                    if !old.is_none() {
2476                        bail_bug!("thread unexpectedly had wake_on_cancel set");
2477                    }
2478                    let old = state
2479                        .get_mut(set)?
2480                        .waiting
2481                        .insert(guest_thread, WaitMode::Callback(self));
2482                    if !old.is_none() {
2483                        bail_bug!("set's waiting set already had this thread registered");
2484                    }
2485                }
2486                None
2487            }
2488            _ => bail!(Trap::UnsupportedCallbackCode),
2489        })
2490    }
2491
2492    /// Add the specified guest call to the "high priority" work item queue, to
2493    /// be started as soon as backpressure and/or reentrance rules allow.
2494    ///
2495    /// SAFETY: The raw pointer arguments must be valid references to guest
2496    /// functions (with the appropriate signatures) when the closures queued by
2497    /// this function are called.
2498    unsafe fn queue_call<T: 'static>(
2499        self,
2500        mut store: StoreContextMut<T>,
2501        guest_thread: QualifiedThreadId,
2502        callee: SendSyncPtr<VMFuncRef>,
2503        param_count: usize,
2504        result_count: usize,
2505        async_: bool,
2506        callback: Option<SendSyncPtr<VMFuncRef>>,
2507        post_return: Option<SendSyncPtr<VMFuncRef>>,
2508    ) -> Result<()> {
2509        /// Return a closure which will call the specified function in the scope
2510        /// of the specified task.
2511        ///
2512        /// This will use `GuestTask::lower_params` to lower the parameters, but
2513        /// will not lift the result; instead, it returns a
2514        /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2515        /// any, may be lifted.  Note that an async-lifted export will have
2516        /// returned its result using the `task.return` intrinsic (or not
2517        /// returned a result at all, in the case of `task.cancel`), in which
2518        /// case the "result" of this call will either be a callback code or
2519        /// nothing.
2520        ///
2521        /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2522        /// the returned closure is called.
2523        unsafe fn make_call<T: 'static>(
2524            store: StoreContextMut<T>,
2525            guest_thread: QualifiedThreadId,
2526            callee: SendSyncPtr<VMFuncRef>,
2527            param_count: usize,
2528            result_count: usize,
2529        ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2530        + Send
2531        + Sync
2532        + 'static
2533        + use<T> {
2534            let token = StoreToken::new(store);
2535            move |store: &mut dyn VMStore| {
2536                let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2537
2538                store
2539                    .concurrent_state_mut()?
2540                    .get_mut(guest_thread.thread)?
2541                    .state = GuestThreadState::Running;
2542                let task = store.concurrent_state_mut()?.get_mut(guest_thread.task)?;
2543                let lower = match task.lower_params.take() {
2544                    Some(l) => l,
2545                    None => bail_bug!("lower_params missing"),
2546                };
2547
2548                lower(store, &mut storage[..param_count])?;
2549
2550                let mut store = token.as_context_mut(store);
2551
2552                // SAFETY: Per the contract documented in `make_call's`
2553                // documentation, `callee` must be a valid pointer.
2554                unsafe {
2555                    crate::Func::call_unchecked_raw(
2556                        &mut store,
2557                        callee.as_non_null(),
2558                        NonNull::new(
2559                            &mut storage[..param_count.max(result_count)]
2560                                as *mut [MaybeUninit<ValRaw>] as _,
2561                        )
2562                        .unwrap(),
2563                    )?;
2564                }
2565
2566                Ok(storage)
2567            }
2568        }
2569
2570        // SAFETY: Per the contract described in this function documentation,
2571        // the `callee` pointer which `call` closes over must be valid when
2572        // called by the closure we queue below.
2573        let call = unsafe {
2574            make_call(
2575                store.as_context_mut(),
2576                guest_thread,
2577                callee,
2578                param_count,
2579                result_count,
2580            )
2581        };
2582
2583        let callee_instance = store
2584            .0
2585            .concurrent_state_mut()?
2586            .get_mut(guest_thread.task)?
2587            .instance;
2588
2589        let fun = if callback.is_some() {
2590            assert!(async_);
2591
2592            Box::new(move |store: &mut dyn VMStore| {
2593                self.add_guest_thread_to_instance_table(
2594                    guest_thread.thread,
2595                    store,
2596                    callee_instance.index,
2597                )?;
2598                let old_thread = store.set_thread(guest_thread)?;
2599                log::trace!(
2600                    "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2601                );
2602
2603                store.enter_instance(callee_instance);
2604
2605                // SAFETY: See the documentation for `make_call` to review the
2606                // contract we must uphold for `call` here.
2607                //
2608                // Per the contract described in the `queue_call`
2609                // documentation, the `callee` pointer which `call` closes
2610                // over must be valid.
2611                let storage = call(store)?;
2612
2613                store.exit_instance(callee_instance)?;
2614
2615                store.set_thread(old_thread)?;
2616                let state = store.concurrent_state_mut()?;
2617                if let Some(t) = old_thread.guest() {
2618                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
2619                }
2620                log::trace!("stackless call: restored {old_thread:?} as current thread");
2621
2622                // SAFETY: `wasmparser` will have validated that the callback
2623                // function returns a `i32` result.
2624                let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2625
2626                self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2627            })
2628                as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2629        } else {
2630            let token = StoreToken::new(store.as_context_mut());
2631            Box::new(move |store: &mut dyn VMStore| {
2632                self.add_guest_thread_to_instance_table(
2633                    guest_thread.thread,
2634                    store,
2635                    callee_instance.index,
2636                )?;
2637                let old_thread = store.set_thread(guest_thread)?;
2638                log::trace!(
2639                    "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2640                );
2641                let flags = self.id().get(store).instance_flags(callee_instance.index);
2642
2643                // Unless this is a callback-less (i.e. stackful)
2644                // async-lifted export, we need to record that the instance
2645                // cannot be entered until the call returns.
2646                if !async_ {
2647                    store.enter_instance(callee_instance);
2648                }
2649
2650                // SAFETY: See the documentation for `make_call` to review the
2651                // contract we must uphold for `call` here.
2652                //
2653                // Per the contract described in the `queue_call`
2654                // documentation, the `callee` pointer which `call` closes
2655                // over must be valid.
2656                let storage = call(store)?;
2657
2658                if !async_ {
2659                    // This is a sync-lifted export, so now is when we lift the
2660                    // result, optionally call the post-return function, if any,
2661                    // and finally notify any current or future waiters that the
2662                    // subtask has returned.
2663
2664                    let lift = {
2665                        store.exit_instance(callee_instance)?;
2666
2667                        let state = store.concurrent_state_mut()?;
2668                        if !state.get_mut(guest_thread.task)?.result.is_none() {
2669                            bail_bug!("task has already produced a result");
2670                        }
2671
2672                        match state.get_mut(guest_thread.task)?.lift_result.take() {
2673                            Some(lift) => lift,
2674                            None => bail_bug!("lift_result field is missing"),
2675                        }
2676                    };
2677
2678                    // SAFETY: `result_count` represents the number of core Wasm
2679                    // results returned, per `wasmparser`.
2680                    let result = (lift.lift)(store, unsafe {
2681                        mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2682                            &storage[..result_count],
2683                        )
2684                    })?;
2685
2686                    let post_return_arg = match result_count {
2687                        0 => ValRaw::i32(0),
2688                        // SAFETY: `result_count` represents the number of
2689                        // core Wasm results returned, per `wasmparser`.
2690                        1 => unsafe { storage[0].assume_init() },
2691                        _ => unreachable!(),
2692                    };
2693
2694                    unsafe {
2695                        call_post_return(
2696                            token.as_context_mut(store),
2697                            post_return.map(|v| v.as_non_null()),
2698                            post_return_arg,
2699                            flags,
2700                        )?;
2701                    }
2702
2703                    self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2704                }
2705
2706                store.set_thread(old_thread)?;
2707
2708                store
2709                    .concurrent_state_mut()?
2710                    .get_mut(guest_thread.task)?
2711                    .exited = true;
2712
2713                // This is a callback-less call, so the implicit thread has now completed
2714                store.cleanup_thread(guest_thread, callee_instance, CleanupTask::Yes)?;
2715                Ok(None)
2716            })
2717        };
2718
2719        store
2720            .0
2721            .concurrent_state_mut()?
2722            .push_high_priority(WorkItem::GuestCall(
2723                callee_instance.index,
2724                GuestCall {
2725                    thread: guest_thread,
2726                    kind: GuestCallKind::StartImplicit(fun),
2727                },
2728            ));
2729
2730        Ok(())
2731    }
2732
2733    /// Prepare (but do not start) a guest->guest call.
2734    ///
2735    /// This is called from fused adapter code generated in
2736    /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2737    /// are synthesized Wasm functions which move the parameters from the caller
2738    /// to the callee and the result from the callee to the caller,
2739    /// respectively.  The adapter will call `Self::start_call` immediately
2740    /// after calling this function.
2741    ///
2742    /// SAFETY: All the pointer arguments must be valid pointers to guest
2743    /// entities (and with the expected signatures for the function references
2744    /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2745    unsafe fn prepare_call<T: 'static>(
2746        self,
2747        mut store: StoreContextMut<T>,
2748        start: NonNull<VMFuncRef>,
2749        return_: NonNull<VMFuncRef>,
2750        caller_instance: RuntimeComponentInstanceIndex,
2751        callee_instance: RuntimeComponentInstanceIndex,
2752        task_return_type: TypeTupleIndex,
2753        callee_async: bool,
2754        memory: *mut VMMemoryDefinition,
2755        string_encoding: StringEncoding,
2756        caller_info: CallerInfo,
2757    ) -> Result<()> {
2758        if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2759            // A task may only call an async-typed function via a sync lower if
2760            // it was created by a call to an async export.  Otherwise, we'll
2761            // trap.
2762            store.0.check_blocking()?;
2763        }
2764
2765        enum ResultInfo {
2766            Heap { results: u32 },
2767            Stack { result_count: u32 },
2768        }
2769
2770        let result_info = match &caller_info {
2771            CallerInfo::Async {
2772                has_result: true,
2773                params,
2774            } => ResultInfo::Heap {
2775                results: match params.last() {
2776                    Some(r) => r.get_u32(),
2777                    None => bail_bug!("retptr missing"),
2778                },
2779            },
2780            CallerInfo::Async {
2781                has_result: false, ..
2782            } => ResultInfo::Stack { result_count: 0 },
2783            CallerInfo::Sync {
2784                result_count,
2785                params,
2786            } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2787                results: match params.last() {
2788                    Some(r) => r.get_u32(),
2789                    None => bail_bug!("arg ptr missing"),
2790                },
2791            },
2792            CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2793                result_count: *result_count,
2794            },
2795        };
2796
2797        let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2798
2799        // Create a new guest task for the call, closing over the `start` and
2800        // `return_` functions to lift the parameters and lower the result,
2801        // respectively.
2802        let start = SendSyncPtr::new(start);
2803        let return_ = SendSyncPtr::new(return_);
2804        let token = StoreToken::new(store.as_context_mut());
2805        let old_thread = store.0.current_guest_thread()?;
2806        let state = store.0.concurrent_state_mut()?;
2807
2808        debug_assert_eq!(
2809            state.get_mut(old_thread.task)?.instance,
2810            self.runtime_instance(caller_instance)
2811        );
2812
2813        let guest_thread = GuestTask::new(
2814            state,
2815            Box::new(move |store, dst| {
2816                let mut store = token.as_context_mut(store);
2817                assert!(dst.len() <= MAX_FLAT_PARAMS);
2818                // The `+ 1` here accounts for the return pointer, if any:
2819                let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2820                let count = match caller_info {
2821                    // Async callers, if they have a result, use the last
2822                    // parameter as a return pointer so chop that off if
2823                    // relevant here.
2824                    CallerInfo::Async { params, has_result } => {
2825                        let params = &params[..params.len() - usize::from(has_result)];
2826                        for (param, src) in params.iter().zip(&mut src) {
2827                            src.write(*param);
2828                        }
2829                        params.len()
2830                    }
2831
2832                    // Sync callers forward everything directly.
2833                    CallerInfo::Sync { params, .. } => {
2834                        for (param, src) in params.iter().zip(&mut src) {
2835                            src.write(*param);
2836                        }
2837                        params.len()
2838                    }
2839                };
2840                // SAFETY: `start` is a valid `*mut VMFuncRef` from
2841                // `wasmtime-cranelift`-generated fused adapter code.  Based on
2842                // how it was constructed (see
2843                // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2844                // for details) we know it takes count parameters and returns
2845                // `dst.len()` results.
2846                unsafe {
2847                    crate::Func::call_unchecked_raw(
2848                        &mut store,
2849                        start.as_non_null(),
2850                        NonNull::new(
2851                            &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2852                        )
2853                        .unwrap(),
2854                    )?;
2855                }
2856                dst.copy_from_slice(&src[..dst.len()]);
2857                let task = store.0.current_guest_thread()?.task;
2858                let state = store.0.concurrent_state_mut()?;
2859                Waitable::Guest(task).set_event(
2860                    state,
2861                    Some(Event::Subtask {
2862                        status: Status::Started,
2863                    }),
2864                )?;
2865                Ok(())
2866            }),
2867            LiftResult {
2868                lift: Box::new(move |store, src| {
2869                    // SAFETY: See comment in closure passed as `lower_params`
2870                    // parameter above.
2871                    let mut store = token.as_context_mut(store);
2872                    let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2873                    if let ResultInfo::Heap { results } = &result_info {
2874                        my_src.push(ValRaw::u32(*results));
2875                    }
2876
2877                    // Execute the `return_` hook, generated by Wasmtime's FACT
2878                    // compiler, in the context of the old thread. The old
2879                    // thread, this thread's caller, may have `realloc`
2880                    // callbacks invoked for example and those need the correct
2881                    // context set for the current thread.
2882                    let prev = store.0.set_thread(old_thread)?;
2883
2884                    // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2885                    // `wasmtime-cranelift`-generated fused adapter code.  Based
2886                    // on how it was constructed (see
2887                    // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2888                    // for details) we know it takes `src.len()` parameters and
2889                    // returns up to 1 result.
2890                    unsafe {
2891                        crate::Func::call_unchecked_raw(
2892                            &mut store,
2893                            return_.as_non_null(),
2894                            my_src.as_mut_slice().into(),
2895                        )?;
2896                    }
2897
2898                    // Restore the previous current thread after the
2899                    // lifting/lowering has returned.
2900                    store.0.set_thread(prev)?;
2901
2902                    let thread = store.0.current_guest_thread()?;
2903                    let state = store.0.concurrent_state_mut()?;
2904                    if sync_caller {
2905                        state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2906                            if let ResultInfo::Stack { result_count } = &result_info {
2907                                match result_count {
2908                                    0 => None,
2909                                    1 => Some(my_src[0]),
2910                                    _ => unreachable!(),
2911                                }
2912                            } else {
2913                                None
2914                            },
2915                        );
2916                    }
2917                    Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2918                }),
2919                ty: task_return_type,
2920                memory: NonNull::new(memory).map(SendSyncPtr::new),
2921                string_encoding,
2922            },
2923            Caller::Guest { thread: old_thread },
2924            None,
2925            self.runtime_instance(callee_instance),
2926            callee_async,
2927        )?;
2928
2929        // Make the new thread the current one so that `Self::start_call` knows
2930        // which one to start.
2931        store.0.set_thread(guest_thread)?;
2932        log::trace!("pushed {guest_thread:?} as current thread; old thread was {old_thread:?}");
2933
2934        Ok(())
2935    }
2936
2937    /// Call the specified callback function for an async-lifted export.
2938    ///
2939    /// SAFETY: `function` must be a valid reference to a guest function of the
2940    /// correct signature for a callback.
2941    unsafe fn call_callback<T>(
2942        self,
2943        mut store: StoreContextMut<T>,
2944        function: SendSyncPtr<VMFuncRef>,
2945        event: Event,
2946        handle: u32,
2947    ) -> Result<u32> {
2948        let (ordinal, result) = event.parts();
2949        let params = &mut [
2950            ValRaw::u32(ordinal),
2951            ValRaw::u32(handle),
2952            ValRaw::u32(result),
2953        ];
2954        // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2955        // `wasmtime-cranelift`-generated fused adapter code or
2956        // `component::Options`.  Per `wasmparser` callback signature
2957        // validation, we know it takes three parameters and returns one.
2958        unsafe {
2959            crate::Func::call_unchecked_raw(
2960                &mut store,
2961                function.as_non_null(),
2962                params.as_mut_slice().into(),
2963            )?;
2964        }
2965        Ok(params[0].get_u32())
2966    }
2967
2968    /// Start a guest->guest call previously prepared using
2969    /// `Self::prepare_call`.
2970    ///
2971    /// This is called from fused adapter code generated in
2972    /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2973    /// this function immediately after calling `Self::prepare_call`.
2974    ///
2975    /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2976    /// functions with the appropriate signatures for the current guest task.
2977    /// If this is a call to an async-lowered import, the actual call may be
2978    /// deferred and run after this function returns, in which case the pointer
2979    /// arguments must also be valid when the call happens.
2980    unsafe fn start_call<T: 'static>(
2981        self,
2982        mut store: StoreContextMut<T>,
2983        callback: *mut VMFuncRef,
2984        post_return: *mut VMFuncRef,
2985        callee: NonNull<VMFuncRef>,
2986        param_count: u32,
2987        result_count: u32,
2988        flags: u32,
2989        storage: Option<&mut [MaybeUninit<ValRaw>]>,
2990    ) -> Result<u32> {
2991        let token = StoreToken::new(store.as_context_mut());
2992        let async_caller = storage.is_none();
2993        let guest_thread = store.0.current_guest_thread()?;
2994        let state = store.0.concurrent_state_mut()?;
2995        let callee_async = state.get_mut(guest_thread.task)?.async_function;
2996        let callee = SendSyncPtr::new(callee);
2997        let param_count = usize::try_from(param_count)?;
2998        assert!(param_count <= MAX_FLAT_PARAMS);
2999        let result_count = usize::try_from(result_count)?;
3000        assert!(result_count <= MAX_FLAT_RESULTS);
3001
3002        let task = state.get_mut(guest_thread.task)?;
3003        if let Some(callback) = NonNull::new(callback) {
3004            // We're calling an async-lifted export with a callback, so store
3005            // the callback and related context as part of the task so we can
3006            // call it later when needed.
3007            let callback = SendSyncPtr::new(callback);
3008            task.callback = Some(Box::new(move |store, event, handle| {
3009                let store = token.as_context_mut(store);
3010                unsafe { self.call_callback::<T>(store, callback, event, handle) }
3011            }));
3012        }
3013
3014        let Caller::Guest { thread: caller } = &task.caller else {
3015            // As of this writing, `start_call` is only used for guest->guest
3016            // calls.
3017            bail_bug!("start_call unexpectedly invoked for host->guest call");
3018        };
3019        let caller = *caller;
3020        let caller_instance = state.get_mut(caller.task)?.instance;
3021
3022        // Queue the call as a "high priority" work item.
3023        unsafe {
3024            self.queue_call(
3025                store.as_context_mut(),
3026                guest_thread,
3027                callee,
3028                param_count,
3029                result_count,
3030                (flags & START_FLAG_ASYNC_CALLEE) != 0,
3031                NonNull::new(callback).map(SendSyncPtr::new),
3032                NonNull::new(post_return).map(SendSyncPtr::new),
3033            )?;
3034        }
3035
3036        let state = store.0.concurrent_state_mut()?;
3037
3038        // Use the caller's `GuestThread::sync_call_set` to register interest in
3039        // the subtask...
3040        let guest_waitable = Waitable::Guest(guest_thread.task);
3041        let old_set = guest_waitable.common(state)?.set;
3042        let set = state.get_mut(caller.thread)?.sync_call_set;
3043        guest_waitable.join(state, Some(set))?;
3044
3045        store.0.set_thread(CurrentThread::None)?;
3046
3047        // ... and suspend this fiber temporarily while we wait for it to start.
3048        //
3049        // Note that we _could_ call the callee directly using the current fiber
3050        // rather than suspend this one, but that would make reasoning about the
3051        // event loop more complicated and is probably only worth doing if
3052        // there's a measurable performance benefit.  In addition, it would mean
3053        // blocking the caller if the callee calls a blocking sync-lowered
3054        // import, and as of this writing the spec says we must not do that.
3055        //
3056        // Alternatively, the fused adapter code could be modified to call the
3057        // callee directly without calling a host-provided intrinsic at all (in
3058        // which case it would need to do its own, inline backpressure checks,
3059        // etc.).  Again, we'd want to see a measurable performance benefit
3060        // before committing to such an optimization.  And again, we'd need to
3061        // update the spec to allow that.
3062        let (status, waitable) = loop {
3063            store.0.suspend(SuspendReason::Waiting {
3064                set,
3065                thread: caller,
3066                // Normally, `StoreOpaque::suspend` would assert it's being
3067                // called from a context where blocking is allowed.  However, if
3068                // `async_caller` is `true`, we'll only "block" long enough for
3069                // the callee to start, i.e. we won't repeat this loop, so we
3070                // tell `suspend` it's okay even if we're not allowed to block.
3071                // Alternatively, if the callee is not an async function, then
3072                // we know it won't block anyway.
3073                skip_may_block_check: async_caller || !callee_async,
3074            })?;
3075
3076            let state = store.0.concurrent_state_mut()?;
3077
3078            log::trace!("taking event for {:?}", guest_thread.task);
3079            let event = guest_waitable.take_event(state)?;
3080            let Some(Event::Subtask { status }) = event else {
3081                bail_bug!("subtasks should only get subtask events, got {event:?}")
3082            };
3083
3084            log::trace!("status {status:?} for {:?}", guest_thread.task);
3085
3086            if status == Status::Returned {
3087                // It returned, so we can stop waiting.
3088                break (status, None);
3089            } else if async_caller {
3090                // It hasn't returned yet, but the caller is calling via an
3091                // async-lowered import, so we generate a handle for the task
3092                // waitable and return the status.
3093                let handle = store
3094                    .0
3095                    .instance_state(caller_instance)
3096                    .handle_table()
3097                    .subtask_insert_guest(guest_thread.task.rep())?;
3098                store
3099                    .0
3100                    .concurrent_state_mut()?
3101                    .get_mut(guest_thread.task)?
3102                    .common
3103                    .handle = Some(handle);
3104                break (status, Some(handle));
3105            } else {
3106                // The callee hasn't returned yet, and the caller is calling via
3107                // a sync-lowered import, so we loop and keep waiting until the
3108                // callee returns.
3109            }
3110        };
3111
3112        guest_waitable.join(store.0.concurrent_state_mut()?, old_set)?;
3113
3114        // Reset the current thread to point to the caller as it resumes control.
3115        store.0.set_thread(caller)?;
3116        store
3117            .0
3118            .concurrent_state_mut()?
3119            .get_mut(caller.thread)?
3120            .state = GuestThreadState::Running;
3121        log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
3122
3123        if let Some(storage) = storage {
3124            // The caller used a sync-lowered import to call an async-lifted
3125            // export, in which case the result, if any, has been stashed in
3126            // `GuestTask::sync_result`.
3127            let state = store.0.concurrent_state_mut()?;
3128            let task = state.get_mut(guest_thread.task)?;
3129            if let Some(result) = task.sync_result.take()? {
3130                if let Some(result) = result {
3131                    storage[0] = MaybeUninit::new(result);
3132                }
3133
3134                if task.exited && task.ready_to_delete() {
3135                    Waitable::Guest(guest_thread.task).delete_from(state)?;
3136                }
3137            }
3138        }
3139
3140        Ok(status.pack(waitable))
3141    }
3142
3143    /// Poll the specified future once on behalf of a guest->host call using an
3144    /// async-lowered import.
3145    ///
3146    /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
3147    /// `Pending`, add it to the set of futures to be polled as part of this
3148    /// instance's event loop until it completes, and then return
3149    /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
3150    ///
3151    /// Whether the future returns `Ready` immediately or later, the `lower`
3152    /// function will be used to lower the result, if any, into the guest caller's
3153    /// stack and linear memory. The `lower` function is invoked with `None` if
3154    /// the future is cancelled.
3155    pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
3156        self,
3157        mut store: StoreContextMut<'_, T>,
3158        future: impl Future<Output = Result<R>> + Send + 'static,
3159        lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
3160    ) -> Result<Option<u32>> {
3161        let token = StoreToken::new(store.as_context_mut());
3162        let task = store.0.current_host_thread()?;
3163        let state = store.0.concurrent_state_mut()?;
3164
3165        // Create an abortable future which hooks calls to poll and manages call
3166        // context state for the future.
3167        let (join_handle, future) = JoinHandle::run(future);
3168        {
3169            let state = &mut state.get_mut(task)?.state;
3170            assert!(matches!(state, HostTaskState::CalleeStarted));
3171            *state = HostTaskState::CalleeRunning(join_handle);
3172        }
3173
3174        let mut future = Box::pin(future);
3175
3176        // Finally, poll the future.  We can use a dummy `Waker` here because
3177        // we'll add the future to `ConcurrentState::futures` and poll it
3178        // automatically from the event loop if it doesn't complete immediately
3179        // here.
3180        let poll = tls::set(store.0, || {
3181            future
3182                .as_mut()
3183                .poll(&mut Context::from_waker(&Waker::noop()))
3184        });
3185
3186        match poll {
3187            // It finished immediately; lower the result and delete the task.
3188            Poll::Ready(result) => {
3189                let result = result.transpose()?;
3190                lower(store.as_context_mut(), result)?;
3191                return Ok(None);
3192            }
3193
3194            // Future isn't ready yet, so fall through.
3195            Poll::Pending => {}
3196        }
3197
3198        // It hasn't finished yet; add the future to
3199        // `ConcurrentState::futures` so it will be polled by the event
3200        // loop and allocate a waitable handle to return to the guest.
3201
3202        // Wrap the future in a closure responsible for lowering the result into
3203        // the guest's stack and memory, as well as notifying any waiters that
3204        // the task returned.
3205        let future = Box::pin(async move {
3206            let result = match future.await {
3207                Some(result) => Some(result?),
3208                None => None,
3209            };
3210            let on_complete = move |store: &mut dyn VMStore| {
3211                // Restore the `current_thread` to be the host so `lower` knows
3212                // how to manipulate borrows and knows which scope of borrows
3213                // to check.
3214                let mut store = token.as_context_mut(store);
3215                let old = store.0.set_thread(task)?;
3216
3217                let status = if result.is_some() {
3218                    Status::Returned
3219                } else {
3220                    Status::ReturnCancelled
3221                };
3222
3223                lower(store.as_context_mut(), result)?;
3224                let state = store.0.concurrent_state_mut()?;
3225                match &mut state.get_mut(task)?.state {
3226                    // The task is already flagged as finished because it was
3227                    // cancelled. No need to transition further.
3228                    HostTaskState::CalleeDone { .. } => {}
3229
3230                    // Otherwise transition this task to the done state.
3231                    other => *other = HostTaskState::CalleeDone { cancelled: false },
3232                }
3233                Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
3234
3235                store.0.set_thread(old)?;
3236                Ok(())
3237            };
3238
3239            // Here we schedule a task to run on a worker fiber to do the
3240            // lowering since it may involve a call to the guest's realloc
3241            // function. This is necessary because calling the guest while
3242            // there are host embedder frames on the stack is unsound.
3243            tls::get(move |store| {
3244                store
3245                    .concurrent_state_mut()?
3246                    .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
3247                        on_complete,
3248                    ))));
3249                Ok(())
3250            })
3251        });
3252
3253        // Make this task visible to the guest and then record what it
3254        // was made visible as.
3255        let state = store.0.concurrent_state_mut()?;
3256        state.push_future(future);
3257        let caller = state.get_mut(task)?.caller;
3258        let instance = state.get_mut(caller.task)?.instance;
3259        let handle = store
3260            .0
3261            .instance_state(instance)
3262            .handle_table()
3263            .subtask_insert_host(task.rep())?;
3264        store.0.concurrent_state_mut()?.get_mut(task)?.common.handle = Some(handle);
3265        log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
3266
3267        // Restore the currently running thread to this host task's
3268        // caller. Note that the host task isn't deallocated as it's
3269        // within the store and will get deallocated later.
3270        store.0.set_thread(caller)?;
3271        Ok(Some(handle))
3272    }
3273
3274    /// Implements the `task.return` intrinsic, lifting the result for the
3275    /// current guest task.
3276    pub(crate) fn task_return(
3277        self,
3278        store: &mut dyn VMStore,
3279        ty: TypeTupleIndex,
3280        options: OptionsIndex,
3281        storage: &[ValRaw],
3282    ) -> Result<()> {
3283        let guest_thread = store.current_guest_thread()?;
3284        let state = store.concurrent_state_mut()?;
3285        let lift = state
3286            .get_mut(guest_thread.task)?
3287            .lift_result
3288            .take()
3289            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3290        if !state.get_mut(guest_thread.task)?.result.is_none() {
3291            bail_bug!("task result unexpectedly already set");
3292        }
3293
3294        let CanonicalOptions {
3295            string_encoding,
3296            data_model,
3297            ..
3298        } = &self.id().get(store).component().env_component().options[options];
3299
3300        let invalid = ty != lift.ty
3301            || string_encoding != &lift.string_encoding
3302            || match data_model {
3303                CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
3304                    Some(memory) => {
3305                        let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
3306                        let actual = self.id().get(store).runtime_memory(memory);
3307                        expected != actual.as_ptr()
3308                    }
3309                    // Memory not specified, meaning it didn't need to be
3310                    // specified per validation, so not invalid.
3311                    None => false,
3312                },
3313                // Always invalid as this isn't supported.
3314                CanonicalOptionsDataModel::Gc { .. } => true,
3315            };
3316
3317        if invalid {
3318            bail!(Trap::TaskReturnInvalid);
3319        }
3320
3321        log::trace!("task.return for {guest_thread:?}");
3322
3323        let result = (lift.lift)(store, storage)?;
3324        self.task_complete(store, guest_thread.task, result, Status::Returned)
3325    }
3326
3327    /// Implements the `task.cancel` intrinsic.
3328    pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
3329        let guest_thread = store.current_guest_thread()?;
3330        let state = store.concurrent_state_mut()?;
3331        let task = state.get_mut(guest_thread.task)?;
3332        if !task.cancel_sent {
3333            bail!(Trap::TaskCancelNotCancelled);
3334        }
3335        _ = task
3336            .lift_result
3337            .take()
3338            .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
3339
3340        if !task.result.is_none() {
3341            bail_bug!("task result should not bet set yet");
3342        }
3343
3344        log::trace!("task.cancel for {guest_thread:?}");
3345
3346        self.task_complete(
3347            store,
3348            guest_thread.task,
3349            Box::new(DummyResult),
3350            Status::ReturnCancelled,
3351        )
3352    }
3353
3354    /// Complete the specified guest task (i.e. indicate that it has either
3355    /// returned a (possibly empty) result or cancelled itself).
3356    ///
3357    /// This will return any resource borrows and notify any current or future
3358    /// waiters that the task has completed.
3359    fn task_complete(
3360        self,
3361        store: &mut StoreOpaque,
3362        guest_task: TableId<GuestTask>,
3363        result: Box<dyn Any + Send + Sync>,
3364        status: Status,
3365    ) -> Result<()> {
3366        store
3367            .component_resource_tables(Some(self))?
3368            .validate_scope_exit()?;
3369
3370        let state = store.concurrent_state_mut()?;
3371        let task = state.get_mut(guest_task)?;
3372
3373        if let Caller::Host { tx, .. } = &mut task.caller {
3374            if let Some(tx) = tx.take() {
3375                _ = tx.send(result);
3376            }
3377        } else {
3378            task.result = Some(result);
3379            Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3380        }
3381
3382        Ok(())
3383    }
3384
3385    /// Implements the `waitable-set.new` intrinsic.
3386    pub(crate) fn waitable_set_new(
3387        self,
3388        store: &mut StoreOpaque,
3389        caller_instance: RuntimeComponentInstanceIndex,
3390    ) -> Result<u32> {
3391        let set = store.concurrent_state_mut()?.push(WaitableSet::default())?;
3392        let handle = store
3393            .instance_state(self.runtime_instance(caller_instance))
3394            .handle_table()
3395            .waitable_set_insert(set.rep())?;
3396        log::trace!("new waitable set {set:?} (handle {handle})");
3397        Ok(handle)
3398    }
3399
3400    /// Implements the `waitable-set.drop` intrinsic.
3401    pub(crate) fn waitable_set_drop(
3402        self,
3403        store: &mut StoreOpaque,
3404        caller_instance: RuntimeComponentInstanceIndex,
3405        set: u32,
3406    ) -> Result<()> {
3407        let rep = store
3408            .instance_state(self.runtime_instance(caller_instance))
3409            .handle_table()
3410            .waitable_set_remove(set)?;
3411
3412        log::trace!("drop waitable set {rep} (handle {set})");
3413
3414        // Note that we're careful to check for waiters _before_ deleting the
3415        // set to avoid dropping any waiters in `WaitMode::Fiber(_)`, which
3416        // would panic.  See `drop-waitable-set-with-waiters.wast` for details.
3417        if !store
3418            .concurrent_state_mut()?
3419            .get_mut(TableId::<WaitableSet>::new(rep))?
3420            .waiting
3421            .is_empty()
3422        {
3423            bail!(Trap::WaitableSetDropHasWaiters);
3424        }
3425
3426        store
3427            .concurrent_state_mut()?
3428            .delete(TableId::<WaitableSet>::new(rep))?;
3429
3430        Ok(())
3431    }
3432
3433    /// Implements the `waitable.join` intrinsic.
3434    pub(crate) fn waitable_join(
3435        self,
3436        store: &mut StoreOpaque,
3437        caller_instance: RuntimeComponentInstanceIndex,
3438        waitable_handle: u32,
3439        set_handle: u32,
3440    ) -> Result<()> {
3441        let mut instance = self.id().get_mut(store);
3442        let waitable =
3443            Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3444
3445        let set = if set_handle == 0 {
3446            None
3447        } else {
3448            let set = instance.instance_states().0[caller_instance]
3449                .handle_table()
3450                .waitable_set_rep(set_handle)?;
3451
3452            let state = store.concurrent_state_mut()?;
3453            if let Some(old) = waitable.common(state)?.set
3454                && state.get_mut(old)?.is_sync_call_set
3455            {
3456                bail!(Trap::WaitableSyncAndAsync);
3457            }
3458
3459            Some(TableId::<WaitableSet>::new(set))
3460        };
3461
3462        log::trace!(
3463            "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3464        );
3465
3466        waitable.join(store.concurrent_state_mut()?, set)
3467    }
3468
3469    /// Implements the `subtask.drop` intrinsic.
3470    pub(crate) fn subtask_drop(
3471        self,
3472        store: &mut StoreOpaque,
3473        caller_instance: RuntimeComponentInstanceIndex,
3474        task_id: u32,
3475    ) -> Result<()> {
3476        self.waitable_join(store, caller_instance, task_id, 0)?;
3477
3478        let (rep, is_host) = store
3479            .instance_state(self.runtime_instance(caller_instance))
3480            .handle_table()
3481            .subtask_remove(task_id)?;
3482
3483        let concurrent_state = store.concurrent_state_mut()?;
3484        let (waitable, delete) = if is_host {
3485            let id = TableId::<HostTask>::new(rep);
3486            let task = concurrent_state.get_mut(id)?;
3487            match &task.state {
3488                HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3489                HostTaskState::CalleeDone { .. } => {}
3490                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3491                    bail_bug!("invalid state for callee in `subtask.drop`")
3492                }
3493            }
3494            (Waitable::Host(id), true)
3495        } else {
3496            let id = TableId::<GuestTask>::new(rep);
3497            let task = concurrent_state.get_mut(id)?;
3498            if task.lift_result.is_some() {
3499                bail!(Trap::SubtaskDropNotResolved);
3500            }
3501            (
3502                Waitable::Guest(id),
3503                concurrent_state.get_mut(id)?.ready_to_delete(),
3504            )
3505        };
3506
3507        waitable.common(concurrent_state)?.handle = None;
3508
3509        // If this subtask has an event that means that the terminal status of
3510        // this subtask wasn't yet received so it can't be dropped yet.
3511        if waitable.take_event(concurrent_state)?.is_some() {
3512            bail!(Trap::SubtaskDropNotResolved);
3513        }
3514
3515        if delete {
3516            waitable.delete_from(concurrent_state)?;
3517        }
3518
3519        log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3520        Ok(())
3521    }
3522
3523    /// Implements the `waitable-set.wait` intrinsic.
3524    pub(crate) fn waitable_set_wait(
3525        self,
3526        store: &mut StoreOpaque,
3527        options: OptionsIndex,
3528        set: u32,
3529        payload: u32,
3530    ) -> Result<u32> {
3531        if !self.options(store, options).async_ {
3532            // The caller may only call `waitable-set.wait` from an async task
3533            // (i.e. a task created via a call to an async export).
3534            // Otherwise, we'll trap.
3535            store.check_blocking()?;
3536        }
3537
3538        let &CanonicalOptions {
3539            cancellable,
3540            instance: caller_instance,
3541            ..
3542        } = &self.id().get(store).component().env_component().options[options];
3543        let rep = store
3544            .instance_state(self.runtime_instance(caller_instance))
3545            .handle_table()
3546            .waitable_set_rep(set)?;
3547
3548        self.waitable_check(
3549            store,
3550            cancellable,
3551            WaitableCheck::Wait,
3552            WaitableCheckParams {
3553                set: TableId::new(rep),
3554                options,
3555                payload,
3556            },
3557        )
3558    }
3559
3560    /// Implements the `waitable-set.poll` intrinsic.
3561    pub(crate) fn waitable_set_poll(
3562        self,
3563        store: &mut StoreOpaque,
3564        options: OptionsIndex,
3565        set: u32,
3566        payload: u32,
3567    ) -> Result<u32> {
3568        let &CanonicalOptions {
3569            cancellable,
3570            instance: caller_instance,
3571            ..
3572        } = &self.id().get(store).component().env_component().options[options];
3573        let rep = store
3574            .instance_state(self.runtime_instance(caller_instance))
3575            .handle_table()
3576            .waitable_set_rep(set)?;
3577
3578        self.waitable_check(
3579            store,
3580            cancellable,
3581            WaitableCheck::Poll,
3582            WaitableCheckParams {
3583                set: TableId::new(rep),
3584                options,
3585                payload,
3586            },
3587        )
3588    }
3589
3590    /// Implements the `thread.index` intrinsic.
3591    pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3592        let thread_id = store.current_guest_thread()?.thread;
3593        match store
3594            .concurrent_state_mut()?
3595            .get_mut(thread_id)?
3596            .instance_rep
3597        {
3598            Some(r) => Ok(r),
3599            None => bail_bug!("thread should have instance_rep by now"),
3600        }
3601    }
3602
3603    /// Implements the `thread.new-indirect` intrinsic.
3604    pub(crate) fn thread_new_indirect<T: 'static>(
3605        self,
3606        mut store: StoreContextMut<T>,
3607        runtime_instance: RuntimeComponentInstanceIndex,
3608        _func_ty_idx: TypeFuncIndex, // currently unused
3609        start_func_table_idx: RuntimeTableIndex,
3610        start_func_idx: u32,
3611        context: i32,
3612    ) -> Result<u32> {
3613        log::trace!("creating new thread");
3614
3615        let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3616        let (instance, registry) = self.id().get_mut_and_registry(store.0);
3617        let callee = instance
3618            .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3619            .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3620        if callee.type_index(store.0) != start_func_ty.type_index() {
3621            bail!(Trap::ThreadNewIndirectInvalidType);
3622        }
3623
3624        let token = StoreToken::new(store.as_context_mut());
3625        let start_func = Box::new(
3626            move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3627                let old_thread = store.set_thread(guest_thread)?;
3628                log::trace!(
3629                    "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3630                );
3631
3632                let mut store = token.as_context_mut(store);
3633                let mut params = [ValRaw::i32(context)];
3634                // Use call_unchecked rather than call or call_async, as we don't want to run the function
3635                // on a separate fiber if we're running in an async store.
3636                unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3637
3638                store.0.set_thread(old_thread)?;
3639
3640                store.0.cleanup_thread(
3641                    guest_thread,
3642                    self.runtime_instance(runtime_instance),
3643                    CleanupTask::Yes,
3644                )?;
3645                log::trace!("explicit thread {guest_thread:?} completed");
3646                let state = store.0.concurrent_state_mut()?;
3647                if let Some(t) = old_thread.guest() {
3648                    state.get_mut(t.thread)?.state = GuestThreadState::Running;
3649                }
3650                log::trace!("thread start: restored {old_thread:?} as current thread");
3651
3652                Ok(())
3653            },
3654        );
3655
3656        let current_thread = store.0.current_guest_thread()?;
3657        let state = store.0.concurrent_state_mut()?;
3658        let parent_task = current_thread.task;
3659
3660        let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3661        let thread_id = state.push(new_thread)?;
3662        state.get_mut(parent_task)?.threads.insert(thread_id);
3663
3664        log::trace!("new thread with id {thread_id:?} created");
3665
3666        self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3667    }
3668
3669    pub(crate) fn resume_thread(
3670        self,
3671        store: &mut StoreOpaque,
3672        runtime_instance: RuntimeComponentInstanceIndex,
3673        thread_idx: u32,
3674        high_priority: bool,
3675        allow_ready: bool,
3676    ) -> Result<()> {
3677        let thread_id =
3678            GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3679        let state = store.concurrent_state_mut()?;
3680        let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3681        let thread = state.get_mut(guest_thread.thread)?;
3682
3683        match mem::replace(&mut thread.state, GuestThreadState::Running) {
3684            GuestThreadState::NotStartedExplicit(start_func) => {
3685                log::trace!("starting thread {guest_thread:?}");
3686                let guest_call = WorkItem::GuestCall(
3687                    runtime_instance,
3688                    GuestCall {
3689                        thread: guest_thread,
3690                        kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3691                            start_func(store, guest_thread)
3692                        })),
3693                    },
3694                );
3695                store
3696                    .concurrent_state_mut()?
3697                    .push_work_item(guest_call, high_priority);
3698            }
3699            GuestThreadState::Suspended(fiber) => {
3700                log::trace!("resuming thread {thread_id:?} that was suspended");
3701                store
3702                    .concurrent_state_mut()?
3703                    .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3704            }
3705            GuestThreadState::Ready { fiber, cancellable } if allow_ready => {
3706                log::trace!("resuming thread {thread_id:?} that was ready");
3707                thread.state = GuestThreadState::Ready { fiber, cancellable };
3708                store
3709                    .concurrent_state_mut()?
3710                    .promote_thread_work_item(guest_thread);
3711            }
3712            other => {
3713                thread.state = other;
3714                bail!(Trap::CannotResumeThread);
3715            }
3716        }
3717        Ok(())
3718    }
3719
3720    fn add_guest_thread_to_instance_table(
3721        self,
3722        thread_id: TableId<GuestThread>,
3723        store: &mut StoreOpaque,
3724        runtime_instance: RuntimeComponentInstanceIndex,
3725    ) -> Result<u32> {
3726        let guest_id = store
3727            .instance_state(self.runtime_instance(runtime_instance))
3728            .thread_handle_table()
3729            .guest_thread_insert(thread_id.rep())?;
3730        store
3731            .concurrent_state_mut()?
3732            .get_mut(thread_id)?
3733            .instance_rep = Some(guest_id);
3734        Ok(guest_id)
3735    }
3736
3737    /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3738    /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3739    pub(crate) fn suspension_intrinsic(
3740        self,
3741        store: &mut StoreOpaque,
3742        caller: RuntimeComponentInstanceIndex,
3743        cancellable: bool,
3744        yielding: bool,
3745        to_thread: SuspensionTarget,
3746    ) -> Result<WaitResult> {
3747        let guest_thread = store.current_guest_thread()?;
3748        if to_thread.is_none() {
3749            let state = store.concurrent_state_mut()?;
3750            if yielding {
3751                // This is a `thread.yield` call
3752                if !state.may_block(guest_thread.task)? {
3753                    // In a non-blocking context, a `thread.yield` may trigger
3754                    // other threads in the same component instance to run.
3755                    if !state.promote_instance_local_thread_work_item(caller) {
3756                        // No other threads are runnable, so just return
3757                        return Ok(WaitResult::Completed);
3758                    }
3759                }
3760            } else {
3761                // The caller may only call `thread.suspend` from an async task
3762                // (i.e. a task created via a call to an async export).
3763                // Otherwise, we'll trap.
3764                store.check_blocking()?;
3765            }
3766        }
3767
3768        // There could be a pending cancellation from a previous uncancellable wait
3769        if cancellable && store.take_pending_cancellation()? {
3770            return Ok(WaitResult::Cancelled);
3771        }
3772
3773        match to_thread {
3774            SuspensionTarget::SomeSuspended(thread) => {
3775                self.resume_thread(store, caller, thread, true, false)?
3776            }
3777            SuspensionTarget::Some(thread) => {
3778                self.resume_thread(store, caller, thread, true, true)?
3779            }
3780            SuspensionTarget::None => { /* nothing to do */ }
3781        }
3782
3783        let reason = if yielding {
3784            SuspendReason::Yielding {
3785                thread: guest_thread,
3786                cancellable,
3787                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3788                // we're handling a `thread.yield-to-suspended` call; otherwise it would
3789                // panic if we called it in a non-blocking context.
3790                skip_may_block_check: to_thread.is_some(),
3791            }
3792        } else {
3793            SuspendReason::ExplicitlySuspending {
3794                thread: guest_thread,
3795                // Tell `StoreOpaque::suspend` it's okay to suspend here since
3796                // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3797                // panic if we called it in a non-blocking context.
3798                skip_may_block_check: to_thread.is_some(),
3799            }
3800        };
3801
3802        store.suspend(reason)?;
3803
3804        if cancellable && store.take_pending_cancellation()? {
3805            Ok(WaitResult::Cancelled)
3806        } else {
3807            Ok(WaitResult::Completed)
3808        }
3809    }
3810
3811    /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3812    fn waitable_check(
3813        self,
3814        store: &mut StoreOpaque,
3815        cancellable: bool,
3816        check: WaitableCheck,
3817        params: WaitableCheckParams,
3818    ) -> Result<u32> {
3819        let guest_thread = store.current_guest_thread()?;
3820
3821        log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3822
3823        let state = store.concurrent_state_mut()?;
3824        let task = state.get_mut(guest_thread.task)?;
3825
3826        // If we're waiting, and there are no events immediately available,
3827        // suspend the fiber until that changes.
3828        match &check {
3829            WaitableCheck::Wait => {
3830                let set = params.set;
3831
3832                if (task.event.is_none()
3833                    || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3834                    && state.get_mut(set)?.ready.is_empty()
3835                {
3836                    if cancellable {
3837                        let old = state
3838                            .get_mut(guest_thread.thread)?
3839                            .wake_on_cancel
3840                            .replace(set);
3841                        if !old.is_none() {
3842                            bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3843                        }
3844                    }
3845
3846                    store.suspend(SuspendReason::Waiting {
3847                        set,
3848                        thread: guest_thread,
3849                        skip_may_block_check: false,
3850                    })?;
3851                }
3852            }
3853            WaitableCheck::Poll => {}
3854        }
3855
3856        log::trace!(
3857            "waitable check for {guest_thread:?}; set {:?}, part two",
3858            params.set
3859        );
3860
3861        // Deliver any pending events to the guest and return.
3862        let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3863
3864        let (ordinal, handle, result) = match &check {
3865            WaitableCheck::Wait => {
3866                let (event, waitable) = match event {
3867                    Some(p) => p,
3868                    None => bail_bug!("event expected to be present"),
3869                };
3870                let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3871                let (ordinal, result) = event.parts();
3872                (ordinal, handle, result)
3873            }
3874            WaitableCheck::Poll => {
3875                if let Some((event, waitable)) = event {
3876                    let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3877                    let (ordinal, result) = event.parts();
3878                    (ordinal, handle, result)
3879                } else {
3880                    log::trace!(
3881                        "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3882                        guest_thread.task,
3883                        params.set
3884                    );
3885                    let (ordinal, result) = Event::None.parts();
3886                    (ordinal, 0, result)
3887                }
3888            }
3889        };
3890        let memory = self.options_memory_mut(store, params.options);
3891        let ptr = crate::component::func::validate_inbounds_dynamic(
3892            &CanonicalAbiInfo::POINTER_PAIR,
3893            memory,
3894            &ValRaw::u32(params.payload),
3895        )?;
3896        memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3897        memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3898        Ok(ordinal)
3899    }
3900
3901    /// Implements the `subtask.cancel` intrinsic.
3902    pub(crate) fn subtask_cancel(
3903        self,
3904        store: &mut StoreOpaque,
3905        caller_instance: RuntimeComponentInstanceIndex,
3906        async_: bool,
3907        task_id: u32,
3908    ) -> Result<u32> {
3909        if !async_ {
3910            // The caller may only sync call `subtask.cancel` from an async task
3911            // (i.e. a task created via a call to an async export).  Otherwise,
3912            // we'll trap.
3913            store.check_blocking()?;
3914        }
3915
3916        let (rep, is_host) = store
3917            .instance_state(self.runtime_instance(caller_instance))
3918            .handle_table()
3919            .subtask_rep(task_id)?;
3920        let waitable = if is_host {
3921            Waitable::Host(TableId::<HostTask>::new(rep))
3922        } else {
3923            Waitable::Guest(TableId::<GuestTask>::new(rep))
3924        };
3925        let concurrent_state = store.concurrent_state_mut()?;
3926
3927        log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3928
3929        let needs_block;
3930        if let Waitable::Host(host_task) = waitable {
3931            let state = &mut concurrent_state.get_mut(host_task)?.state;
3932            match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3933                // If the callee is still running, signal an abort is requested.
3934                //
3935                // After cancelling this falls through to block waiting for the
3936                // host task to actually finish assuming that `async_` is false.
3937                // This blocking behavior resolves the race of `handle.abort()`
3938                // with the task actually getting cancelled or finishing.
3939                HostTaskState::CalleeRunning(handle) => {
3940                    handle.abort();
3941                    needs_block = true;
3942                }
3943
3944                // Cancellation was already requested, so fail as the task can't
3945                // be cancelled twice.
3946                HostTaskState::CalleeDone { cancelled } => {
3947                    if cancelled {
3948                        bail!(Trap::SubtaskCancelAfterTerminal);
3949                    } else {
3950                        // The callee is already done so there's no need to
3951                        // block further for an event.
3952                        needs_block = false;
3953                    }
3954                }
3955
3956                // These states should not be possible for a subtask that's
3957                // visible from the guest, so trap here.
3958                HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3959                    bail_bug!("invalid states for host callee")
3960                }
3961            }
3962        } else {
3963            let guest_task = TableId::<GuestTask>::new(rep);
3964            let task = concurrent_state.get_mut(guest_task)?;
3965            if !task.already_lowered_parameters() {
3966                store.cancel_guest_subtask_without_lowered_parameters(
3967                    self.runtime_instance(caller_instance),
3968                    guest_task,
3969                )?;
3970                return Ok(Status::StartCancelled as u32);
3971            } else if !task.returned_or_cancelled() {
3972                // Started, but not yet returned or cancelled; send the
3973                // `CANCELLED` event
3974                task.cancel_sent = true;
3975                // Note that this might overwrite an event that was set earlier
3976                // (e.g. `Event::None` if the task is yielding, or
3977                // `Event::Cancelled` if it was already cancelled), but that's
3978                // okay -- this should supersede the previous state.
3979                task.event = Some(Event::Cancelled);
3980                let runtime_instance = task.instance.index;
3981                for thread in task.threads.clone() {
3982                    let thread = QualifiedThreadId {
3983                        task: guest_task,
3984                        thread,
3985                    };
3986                    let thread_mut = concurrent_state.get_mut(thread.thread)?;
3987                    if let Some(set) = thread_mut.wake_on_cancel.take() {
3988                        // The thread is in a cancellable wait, so wake it up:
3989                        let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3990                            Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3991                            Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3992                                runtime_instance,
3993                                GuestCall {
3994                                    thread,
3995                                    kind: GuestCallKind::DeliverEvent {
3996                                        instance,
3997                                        set: None,
3998                                    },
3999                                },
4000                            ),
4001                            None => bail_bug!("thread not present in wake_on_cancel set"),
4002                        };
4003                        concurrent_state.push_high_priority(item);
4004
4005                        let caller = store.current_guest_thread()?;
4006                        store.suspend(SuspendReason::Yielding {
4007                            thread: caller,
4008                            cancellable: false,
4009                            // `subtask.cancel` is not allowed to be called in a
4010                            // sync context, so we cannot skip the may-block check.
4011                            skip_may_block_check: false,
4012                        })?;
4013                        break;
4014                    } else if let GuestThreadState::Ready {
4015                        cancellable: true, ..
4016                    } = &thread_mut.state
4017                    {
4018                        // The thread is in a cancellable yield, so yield back
4019                        // to it.
4020                        concurrent_state.promote_thread_work_item(thread);
4021                        let caller = store.current_guest_thread()?;
4022                        store.suspend(SuspendReason::Yielding {
4023                            thread: caller,
4024                            cancellable: false,
4025                            skip_may_block_check: false,
4026                        })?;
4027                        break;
4028                    }
4029                }
4030
4031                // Guest tasks need to block if they have not yet returned or
4032                // cancelled, even as a result of the event delivery above.
4033                needs_block = !store
4034                    .concurrent_state_mut()?
4035                    .get_mut(guest_task)?
4036                    .returned_or_cancelled()
4037            } else {
4038                needs_block = false;
4039            }
4040        };
4041
4042        // If we need to block waiting on the terminal status of this subtask
4043        // then return immediately in `async` mode, or otherwise wait for the
4044        // event to get signaled through the store.
4045        if needs_block {
4046            if async_ {
4047                return Ok(BLOCKED);
4048            }
4049
4050            // Wait for this waitable to get signaled with its terminal status
4051            // from the completion callback enqueued by `first_poll`. Once
4052            // that's done fall through to the sahred
4053            store.wait_for_event(waitable)?;
4054
4055            // .. fall through to determine what event's in store for us.
4056        }
4057
4058        let event = waitable.take_event(store.concurrent_state_mut()?)?;
4059        if let Some(Event::Subtask {
4060            status: status @ (Status::Returned | Status::ReturnCancelled),
4061        }) = event
4062        {
4063            Ok(status as u32)
4064        } else {
4065            bail!(Trap::SubtaskCancelAfterTerminal);
4066        }
4067    }
4068}
4069
4070/// Trait representing component model ABI async intrinsics and fused adapter
4071/// helper functions.
4072///
4073/// SAFETY (callers): Most of the methods in this trait accept raw pointers,
4074/// which must be valid for at least the duration of the call (and possibly for
4075/// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
4076/// pointers used for async calls).
4077pub trait VMComponentAsyncStore {
4078    /// A helper function for fused adapter modules involving calls where the
4079    /// one of the caller or callee is async.
4080    ///
4081    /// This helper is not used when the caller and callee both use the sync
4082    /// ABI, only when at least one is async is this used.
4083    unsafe fn prepare_call(
4084        &mut self,
4085        instance: Instance,
4086        memory: *mut VMMemoryDefinition,
4087        start: NonNull<VMFuncRef>,
4088        return_: NonNull<VMFuncRef>,
4089        caller_instance: RuntimeComponentInstanceIndex,
4090        callee_instance: RuntimeComponentInstanceIndex,
4091        task_return_type: TypeTupleIndex,
4092        callee_async: bool,
4093        string_encoding: StringEncoding,
4094        result_count: u32,
4095        storage: *mut ValRaw,
4096        storage_len: usize,
4097    ) -> Result<()>;
4098
4099    /// A helper function for fused adapter modules involving calls where the
4100    /// caller is sync-lowered but the callee is async-lifted.
4101    unsafe fn sync_start(
4102        &mut self,
4103        instance: Instance,
4104        callback: *mut VMFuncRef,
4105        callee: NonNull<VMFuncRef>,
4106        param_count: u32,
4107        storage: *mut MaybeUninit<ValRaw>,
4108        storage_len: usize,
4109    ) -> Result<()>;
4110
4111    /// A helper function for fused adapter modules involving calls where the
4112    /// caller is async-lowered.
4113    unsafe fn async_start(
4114        &mut self,
4115        instance: Instance,
4116        callback: *mut VMFuncRef,
4117        post_return: *mut VMFuncRef,
4118        callee: NonNull<VMFuncRef>,
4119        param_count: u32,
4120        result_count: u32,
4121        flags: u32,
4122    ) -> Result<u32>;
4123
4124    /// The `future.write` intrinsic.
4125    fn future_write(
4126        &mut self,
4127        instance: Instance,
4128        caller: RuntimeComponentInstanceIndex,
4129        ty: TypeFutureTableIndex,
4130        options: OptionsIndex,
4131        future: u32,
4132        address: u32,
4133    ) -> Result<u32>;
4134
4135    /// The `future.read` intrinsic.
4136    fn future_read(
4137        &mut self,
4138        instance: Instance,
4139        caller: RuntimeComponentInstanceIndex,
4140        ty: TypeFutureTableIndex,
4141        options: OptionsIndex,
4142        future: u32,
4143        address: u32,
4144    ) -> Result<u32>;
4145
4146    /// The `future.drop-writable` intrinsic.
4147    fn future_drop_writable(
4148        &mut self,
4149        instance: Instance,
4150        ty: TypeFutureTableIndex,
4151        writer: u32,
4152    ) -> Result<()>;
4153
4154    /// The `stream.write` intrinsic.
4155    fn stream_write(
4156        &mut self,
4157        instance: Instance,
4158        caller: RuntimeComponentInstanceIndex,
4159        ty: TypeStreamTableIndex,
4160        options: OptionsIndex,
4161        stream: u32,
4162        address: u32,
4163        count: u32,
4164    ) -> Result<u32>;
4165
4166    /// The `stream.read` intrinsic.
4167    fn stream_read(
4168        &mut self,
4169        instance: Instance,
4170        caller: RuntimeComponentInstanceIndex,
4171        ty: TypeStreamTableIndex,
4172        options: OptionsIndex,
4173        stream: u32,
4174        address: u32,
4175        count: u32,
4176    ) -> Result<u32>;
4177
4178    /// The "fast-path" implementation of the `stream.write` intrinsic for
4179    /// "flat" (i.e. memcpy-able) payloads.
4180    fn flat_stream_write(
4181        &mut self,
4182        instance: Instance,
4183        caller: RuntimeComponentInstanceIndex,
4184        ty: TypeStreamTableIndex,
4185        options: OptionsIndex,
4186        payload_size: u32,
4187        payload_align: u32,
4188        stream: u32,
4189        address: u32,
4190        count: u32,
4191    ) -> Result<u32>;
4192
4193    /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
4194    /// (i.e. memcpy-able) payloads.
4195    fn flat_stream_read(
4196        &mut self,
4197        instance: Instance,
4198        caller: RuntimeComponentInstanceIndex,
4199        ty: TypeStreamTableIndex,
4200        options: OptionsIndex,
4201        payload_size: u32,
4202        payload_align: u32,
4203        stream: u32,
4204        address: u32,
4205        count: u32,
4206    ) -> Result<u32>;
4207
4208    /// The `stream.drop-writable` intrinsic.
4209    fn stream_drop_writable(
4210        &mut self,
4211        instance: Instance,
4212        ty: TypeStreamTableIndex,
4213        writer: u32,
4214    ) -> Result<()>;
4215
4216    /// The `error-context.debug-message` intrinsic.
4217    fn error_context_debug_message(
4218        &mut self,
4219        instance: Instance,
4220        ty: TypeComponentLocalErrorContextTableIndex,
4221        options: OptionsIndex,
4222        err_ctx_handle: u32,
4223        debug_msg_address: u32,
4224    ) -> Result<()>;
4225
4226    /// The `thread.new-indirect` intrinsic
4227    fn thread_new_indirect(
4228        &mut self,
4229        instance: Instance,
4230        caller: RuntimeComponentInstanceIndex,
4231        func_ty_idx: TypeFuncIndex,
4232        start_func_table_idx: RuntimeTableIndex,
4233        start_func_idx: u32,
4234        context: i32,
4235    ) -> Result<u32>;
4236}
4237
4238/// SAFETY: See trait docs.
4239impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
4240    unsafe fn prepare_call(
4241        &mut self,
4242        instance: Instance,
4243        memory: *mut VMMemoryDefinition,
4244        start: NonNull<VMFuncRef>,
4245        return_: NonNull<VMFuncRef>,
4246        caller_instance: RuntimeComponentInstanceIndex,
4247        callee_instance: RuntimeComponentInstanceIndex,
4248        task_return_type: TypeTupleIndex,
4249        callee_async: bool,
4250        string_encoding: StringEncoding,
4251        result_count_or_max_if_async: u32,
4252        storage: *mut ValRaw,
4253        storage_len: usize,
4254    ) -> Result<()> {
4255        // SAFETY: The `wasmtime_cranelift`-generated code that calls
4256        // this method will have ensured that `storage` is a valid
4257        // pointer containing at least `storage_len` items.
4258        let params = unsafe { core::slice::from_raw_parts(storage, storage_len) }.to_vec();
4259
4260        unsafe {
4261            instance.prepare_call(
4262                StoreContextMut(self),
4263                start,
4264                return_,
4265                caller_instance,
4266                callee_instance,
4267                task_return_type,
4268                callee_async,
4269                memory,
4270                string_encoding,
4271                match result_count_or_max_if_async {
4272                    PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
4273                        params,
4274                        has_result: false,
4275                    },
4276                    PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
4277                        params,
4278                        has_result: true,
4279                    },
4280                    result_count => CallerInfo::Sync {
4281                        params,
4282                        result_count,
4283                    },
4284                },
4285            )
4286        }
4287    }
4288
4289    unsafe fn sync_start(
4290        &mut self,
4291        instance: Instance,
4292        callback: *mut VMFuncRef,
4293        callee: NonNull<VMFuncRef>,
4294        param_count: u32,
4295        storage: *mut MaybeUninit<ValRaw>,
4296        storage_len: usize,
4297    ) -> Result<()> {
4298        unsafe {
4299            instance
4300                .start_call(
4301                    StoreContextMut(self),
4302                    callback,
4303                    ptr::null_mut(),
4304                    callee,
4305                    param_count,
4306                    1,
4307                    START_FLAG_ASYNC_CALLEE,
4308                    // SAFETY: The `wasmtime_cranelift`-generated code that calls
4309                    // this method will have ensured that `storage` is a valid
4310                    // pointer containing at least `storage_len` items.
4311                    Some(core::slice::from_raw_parts_mut(storage, storage_len)),
4312                )
4313                .map(drop)
4314        }
4315    }
4316
4317    unsafe fn async_start(
4318        &mut self,
4319        instance: Instance,
4320        callback: *mut VMFuncRef,
4321        post_return: *mut VMFuncRef,
4322        callee: NonNull<VMFuncRef>,
4323        param_count: u32,
4324        result_count: u32,
4325        flags: u32,
4326    ) -> Result<u32> {
4327        unsafe {
4328            instance.start_call(
4329                StoreContextMut(self),
4330                callback,
4331                post_return,
4332                callee,
4333                param_count,
4334                result_count,
4335                flags,
4336                None,
4337            )
4338        }
4339    }
4340
4341    fn future_write(
4342        &mut self,
4343        instance: Instance,
4344        caller: RuntimeComponentInstanceIndex,
4345        ty: TypeFutureTableIndex,
4346        options: OptionsIndex,
4347        future: u32,
4348        address: u32,
4349    ) -> Result<u32> {
4350        instance
4351            .guest_write(
4352                StoreContextMut(self),
4353                caller,
4354                TransmitIndex::Future(ty),
4355                options,
4356                None,
4357                future,
4358                address,
4359                1,
4360            )
4361            .map(|result| result.encode())
4362    }
4363
4364    fn future_read(
4365        &mut self,
4366        instance: Instance,
4367        caller: RuntimeComponentInstanceIndex,
4368        ty: TypeFutureTableIndex,
4369        options: OptionsIndex,
4370        future: u32,
4371        address: u32,
4372    ) -> Result<u32> {
4373        instance
4374            .guest_read(
4375                StoreContextMut(self),
4376                caller,
4377                TransmitIndex::Future(ty),
4378                options,
4379                None,
4380                future,
4381                address,
4382                1,
4383            )
4384            .map(|result| result.encode())
4385    }
4386
4387    fn stream_write(
4388        &mut self,
4389        instance: Instance,
4390        caller: RuntimeComponentInstanceIndex,
4391        ty: TypeStreamTableIndex,
4392        options: OptionsIndex,
4393        stream: u32,
4394        address: u32,
4395        count: u32,
4396    ) -> Result<u32> {
4397        instance
4398            .guest_write(
4399                StoreContextMut(self),
4400                caller,
4401                TransmitIndex::Stream(ty),
4402                options,
4403                None,
4404                stream,
4405                address,
4406                count,
4407            )
4408            .map(|result| result.encode())
4409    }
4410
4411    fn stream_read(
4412        &mut self,
4413        instance: Instance,
4414        caller: RuntimeComponentInstanceIndex,
4415        ty: TypeStreamTableIndex,
4416        options: OptionsIndex,
4417        stream: u32,
4418        address: u32,
4419        count: u32,
4420    ) -> Result<u32> {
4421        instance
4422            .guest_read(
4423                StoreContextMut(self),
4424                caller,
4425                TransmitIndex::Stream(ty),
4426                options,
4427                None,
4428                stream,
4429                address,
4430                count,
4431            )
4432            .map(|result| result.encode())
4433    }
4434
4435    fn future_drop_writable(
4436        &mut self,
4437        instance: Instance,
4438        ty: TypeFutureTableIndex,
4439        writer: u32,
4440    ) -> Result<()> {
4441        instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4442    }
4443
4444    fn flat_stream_write(
4445        &mut self,
4446        instance: Instance,
4447        caller: RuntimeComponentInstanceIndex,
4448        ty: TypeStreamTableIndex,
4449        options: OptionsIndex,
4450        payload_size: u32,
4451        payload_align: u32,
4452        stream: u32,
4453        address: u32,
4454        count: u32,
4455    ) -> Result<u32> {
4456        instance
4457            .guest_write(
4458                StoreContextMut(self),
4459                caller,
4460                TransmitIndex::Stream(ty),
4461                options,
4462                Some(FlatAbi {
4463                    size: payload_size,
4464                    align: payload_align,
4465                }),
4466                stream,
4467                address,
4468                count,
4469            )
4470            .map(|result| result.encode())
4471    }
4472
4473    fn flat_stream_read(
4474        &mut self,
4475        instance: Instance,
4476        caller: RuntimeComponentInstanceIndex,
4477        ty: TypeStreamTableIndex,
4478        options: OptionsIndex,
4479        payload_size: u32,
4480        payload_align: u32,
4481        stream: u32,
4482        address: u32,
4483        count: u32,
4484    ) -> Result<u32> {
4485        instance
4486            .guest_read(
4487                StoreContextMut(self),
4488                caller,
4489                TransmitIndex::Stream(ty),
4490                options,
4491                Some(FlatAbi {
4492                    size: payload_size,
4493                    align: payload_align,
4494                }),
4495                stream,
4496                address,
4497                count,
4498            )
4499            .map(|result| result.encode())
4500    }
4501
4502    fn stream_drop_writable(
4503        &mut self,
4504        instance: Instance,
4505        ty: TypeStreamTableIndex,
4506        writer: u32,
4507    ) -> Result<()> {
4508        instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4509    }
4510
4511    fn error_context_debug_message(
4512        &mut self,
4513        instance: Instance,
4514        ty: TypeComponentLocalErrorContextTableIndex,
4515        options: OptionsIndex,
4516        err_ctx_handle: u32,
4517        debug_msg_address: u32,
4518    ) -> Result<()> {
4519        instance.error_context_debug_message(
4520            StoreContextMut(self),
4521            ty,
4522            options,
4523            err_ctx_handle,
4524            debug_msg_address,
4525        )
4526    }
4527
4528    fn thread_new_indirect(
4529        &mut self,
4530        instance: Instance,
4531        caller: RuntimeComponentInstanceIndex,
4532        func_ty_idx: TypeFuncIndex,
4533        start_func_table_idx: RuntimeTableIndex,
4534        start_func_idx: u32,
4535        context: i32,
4536    ) -> Result<u32> {
4537        instance.thread_new_indirect(
4538            StoreContextMut(self),
4539            caller,
4540            func_ty_idx,
4541            start_func_table_idx,
4542            start_func_idx,
4543            context,
4544        )
4545    }
4546}
4547
4548type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4549
4550/// Represents the state of a pending host task.
4551///
4552/// This is used to represent tasks when the guest calls into the host.
4553pub(crate) struct HostTask {
4554    common: WaitableCommon,
4555
4556    /// Guest thread which called the host.
4557    caller: QualifiedThreadId,
4558
4559    /// State of borrows/etc the host needs to track. Used when the guest passes
4560    /// borrows to the host, for example.
4561    call_context: CallContext,
4562
4563    state: HostTaskState,
4564}
4565
4566enum HostTaskState {
4567    /// A host task has been created and it's considered "started".
4568    ///
4569    /// The host task has yet to enter `first_poll` or `poll_and_block` which
4570    /// is where this will get updated further.
4571    CalleeStarted,
4572
4573    /// State used for tasks in `first_poll` meaning that the guest did an async
4574    /// lower of a host async function which is blocked. The specified handle is
4575    /// linked to the future in the main `FuturesUnordered` of a store which is
4576    /// used to cancel it if the guest requests cancellation.
4577    CalleeRunning(JoinHandle),
4578
4579    /// Terminal state used for tasks in `poll_and_block` to store the result of
4580    /// their computation. Note that this state is not used for tasks in
4581    /// `first_poll`.
4582    CalleeFinished(LiftedResult),
4583
4584    /// Terminal state for host tasks meaning that the task was cancelled or the
4585    /// result was taken.
4586    CalleeDone { cancelled: bool },
4587}
4588
4589impl HostTask {
4590    fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4591        Self {
4592            common: WaitableCommon::default(),
4593            call_context: CallContext::default(),
4594            caller,
4595            state,
4596        }
4597    }
4598}
4599
4600impl TableDebug for HostTask {
4601    fn type_name() -> &'static str {
4602        "HostTask"
4603    }
4604}
4605
4606type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4607
4608/// Represents the caller of a given guest task.
4609enum Caller {
4610    /// The host called the guest task.
4611    Host {
4612        /// If present, may be used to deliver the result.
4613        tx: Option<oneshot::Sender<LiftedResult>>,
4614        /// If true, there's a host future that must be dropped before the task
4615        /// can be deleted.
4616        host_future_present: bool,
4617        /// Represents the caller of the host function which called back into a
4618        /// guest. Note that this thread could belong to an entirely unrelated
4619        /// top-level component instance than the one the host called into.
4620        caller: CurrentThread,
4621    },
4622    /// Another guest thread called the guest task
4623    Guest {
4624        /// The id of the caller
4625        thread: QualifiedThreadId,
4626    },
4627}
4628
4629/// Represents a closure and related canonical ABI parameters required to
4630/// validate a `task.return` call at runtime and lift the result.
4631struct LiftResult {
4632    lift: RawLift,
4633    ty: TypeTupleIndex,
4634    memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4635    string_encoding: StringEncoding,
4636}
4637
4638/// The table ID for a guest thread, qualified by the task to which it belongs.
4639///
4640/// This exists to minimize table lookups and the necessity to pass stores around mutably
4641/// for the common case of identifying the task to which a thread belongs.
4642#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4643pub(crate) struct QualifiedThreadId {
4644    task: TableId<GuestTask>,
4645    thread: TableId<GuestThread>,
4646}
4647
4648impl QualifiedThreadId {
4649    fn qualify(
4650        state: &mut ConcurrentState,
4651        thread: TableId<GuestThread>,
4652    ) -> Result<QualifiedThreadId> {
4653        Ok(QualifiedThreadId {
4654            task: state.get_mut(thread)?.parent_task,
4655            thread,
4656        })
4657    }
4658}
4659
4660impl fmt::Debug for QualifiedThreadId {
4661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4662        f.debug_tuple("QualifiedThreadId")
4663            .field(&self.task.rep())
4664            .field(&self.thread.rep())
4665            .finish()
4666    }
4667}
4668
4669enum GuestThreadState {
4670    NotStartedImplicit,
4671    NotStartedExplicit(
4672        Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4673    ),
4674    Running,
4675    Suspended(StoreFiber<'static>),
4676    Ready {
4677        fiber: StoreFiber<'static>,
4678        cancellable: bool,
4679    },
4680    Completed,
4681}
4682pub struct GuestThread {
4683    /// Context-local state used to implement the `context.{get,set}`
4684    /// intrinsics.
4685    context: [u32; NUM_COMPONENT_CONTEXT_SLOTS],
4686    /// The owning guest task.
4687    parent_task: TableId<GuestTask>,
4688    /// If present, indicates that the thread is currently waiting on the
4689    /// specified set but may be cancelled and woken immediately.
4690    wake_on_cancel: Option<TableId<WaitableSet>>,
4691    /// The execution state of this guest thread
4692    state: GuestThreadState,
4693    /// The index of this thread in the component instance's handle table.
4694    /// This must always be `Some` after initialization.
4695    instance_rep: Option<u32>,
4696    /// Scratch waitable set used to watch subtasks during synchronous calls.
4697    sync_call_set: TableId<WaitableSet>,
4698}
4699
4700impl GuestThread {
4701    /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4702    /// handle.
4703    fn from_instance(
4704        state: Pin<&mut ComponentInstance>,
4705        caller_instance: RuntimeComponentInstanceIndex,
4706        guest_thread: u32,
4707    ) -> Result<TableId<Self>> {
4708        let rep = state.instance_states().0[caller_instance]
4709            .thread_handle_table()
4710            .guest_thread_rep(guest_thread)?;
4711        Ok(TableId::new(rep))
4712    }
4713
4714    fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4715        let sync_call_set = state.push(WaitableSet {
4716            is_sync_call_set: true,
4717            ..WaitableSet::default()
4718        })?;
4719        Ok(Self {
4720            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4721            parent_task,
4722            wake_on_cancel: None,
4723            state: GuestThreadState::NotStartedImplicit,
4724            instance_rep: None,
4725            sync_call_set,
4726        })
4727    }
4728
4729    fn new_explicit(
4730        state: &mut ConcurrentState,
4731        parent_task: TableId<GuestTask>,
4732        start_func: Box<
4733            dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4734        >,
4735    ) -> Result<Self> {
4736        let sync_call_set = state.push(WaitableSet {
4737            is_sync_call_set: true,
4738            ..WaitableSet::default()
4739        })?;
4740        Ok(Self {
4741            context: [0; NUM_COMPONENT_CONTEXT_SLOTS],
4742            parent_task,
4743            wake_on_cancel: None,
4744            state: GuestThreadState::NotStartedExplicit(start_func),
4745            instance_rep: None,
4746            sync_call_set,
4747        })
4748    }
4749}
4750
4751impl TableDebug for GuestThread {
4752    fn type_name() -> &'static str {
4753        "GuestThread"
4754    }
4755}
4756
4757enum SyncResult {
4758    NotProduced,
4759    Produced(Option<ValRaw>),
4760    Taken,
4761}
4762
4763impl SyncResult {
4764    fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4765        Ok(match mem::replace(self, SyncResult::Taken) {
4766            SyncResult::NotProduced => None,
4767            SyncResult::Produced(val) => Some(val),
4768            SyncResult::Taken => {
4769                bail_bug!("attempted to take a synchronous result that was already taken")
4770            }
4771        })
4772    }
4773}
4774
4775#[derive(Debug)]
4776enum HostFutureState {
4777    NotApplicable,
4778    Live,
4779    Dropped,
4780}
4781
4782/// Represents a pending guest task.
4783pub(crate) struct GuestTask {
4784    /// See `WaitableCommon`
4785    common: WaitableCommon,
4786    /// Closure to lower the parameters passed to this task.
4787    lower_params: Option<RawLower>,
4788    /// See `LiftResult`
4789    lift_result: Option<LiftResult>,
4790    /// A place to stash the type-erased lifted result if it can't be delivered
4791    /// immediately.
4792    result: Option<LiftedResult>,
4793    /// Closure to call the callback function for an async-lifted export, if
4794    /// provided.
4795    callback: Option<CallbackFn>,
4796    /// See `Caller`
4797    caller: Caller,
4798    /// Borrow state for this task.
4799    ///
4800    /// Keeps track of `borrow<T>` received to this task to ensure that
4801    /// everything is dropped by the time it exits.
4802    call_context: CallContext,
4803    /// A place to stash the lowered result for a sync-to-async call until it
4804    /// can be returned to the caller.
4805    sync_result: SyncResult,
4806    /// Whether or not the task has been cancelled (i.e. whether the task is
4807    /// permitted to call `task.cancel`).
4808    cancel_sent: bool,
4809    /// Whether or not we've sent a `Status::Starting` event to any current or
4810    /// future waiters for this waitable.
4811    starting_sent: bool,
4812    /// The runtime instance to which the exported function for this guest task
4813    /// belongs.
4814    ///
4815    /// Note that the task may do a sync->sync call via a fused adapter which
4816    /// results in that task executing code in a different instance, and it may
4817    /// call host functions and intrinsics from that other instance.
4818    instance: RuntimeInstance,
4819    /// If present, a pending `Event::None` or `Event::Cancelled` to be
4820    /// delivered to this task.
4821    event: Option<Event>,
4822    /// Whether or not the task has exited.
4823    exited: bool,
4824    /// Threads belonging to this task
4825    threads: HashSet<TableId<GuestThread>>,
4826    /// The state of the host future that represents an async task, which must
4827    /// be dropped before we can delete the task.
4828    host_future_state: HostFutureState,
4829    /// Indicates whether this task was created for a call to an async-lifted
4830    /// export.
4831    async_function: bool,
4832
4833    decremented_interesting_task_count: bool,
4834}
4835
4836impl GuestTask {
4837    fn already_lowered_parameters(&self) -> bool {
4838        // We reset `lower_params` after we lower the parameters
4839        self.lower_params.is_none()
4840    }
4841
4842    fn returned_or_cancelled(&self) -> bool {
4843        // We reset `lift_result` after we return or exit
4844        self.lift_result.is_none()
4845    }
4846
4847    fn ready_to_delete(&self) -> bool {
4848        let threads_completed = self.threads.is_empty();
4849        let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4850        let pending_completion_event = matches!(
4851            self.common.event,
4852            Some(Event::Subtask {
4853                status: Status::Returned | Status::ReturnCancelled
4854            })
4855        );
4856        let ready = threads_completed
4857            && !has_sync_result
4858            && !pending_completion_event
4859            && !matches!(self.host_future_state, HostFutureState::Live);
4860        log::trace!(
4861            "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4862            threads_completed,
4863            has_sync_result,
4864            pending_completion_event,
4865            self.host_future_state
4866        );
4867        ready
4868    }
4869
4870    fn new(
4871        state: &mut ConcurrentState,
4872        lower_params: RawLower,
4873        lift_result: LiftResult,
4874        caller: Caller,
4875        callback: Option<CallbackFn>,
4876        instance: RuntimeInstance,
4877        async_function: bool,
4878    ) -> Result<QualifiedThreadId> {
4879        let host_future_state = match &caller {
4880            Caller::Guest { .. } => HostFutureState::NotApplicable,
4881            Caller::Host {
4882                host_future_present,
4883                ..
4884            } => {
4885                if *host_future_present {
4886                    HostFutureState::Live
4887                } else {
4888                    HostFutureState::NotApplicable
4889                }
4890            }
4891        };
4892        let task = state.push(Self {
4893            common: WaitableCommon::default(),
4894            lower_params: Some(lower_params),
4895            lift_result: Some(lift_result),
4896            result: None,
4897            callback,
4898            caller,
4899            call_context: CallContext::default(),
4900            sync_result: SyncResult::NotProduced,
4901            cancel_sent: false,
4902            starting_sent: false,
4903            instance,
4904            event: None,
4905            exited: false,
4906            threads: HashSet::new(),
4907            host_future_state,
4908            async_function,
4909            decremented_interesting_task_count: false,
4910        })?;
4911        let new_thread = GuestThread::new_implicit(state, task)?;
4912        let thread = state.push(new_thread)?;
4913        state.get_mut(task)?.threads.insert(thread);
4914        state.interesting_tasks += 1;
4915        Ok(QualifiedThreadId { task, thread })
4916    }
4917}
4918
4919impl TableDebug for GuestTask {
4920    fn type_name() -> &'static str {
4921        "GuestTask"
4922    }
4923}
4924
4925/// Represents state common to all kinds of waitables.
4926#[derive(Default)]
4927struct WaitableCommon {
4928    /// The currently pending event for this waitable, if any.
4929    event: Option<Event>,
4930    /// The set to which this waitable belongs, if any.
4931    set: Option<TableId<WaitableSet>>,
4932    /// The handle with which the guest refers to this waitable, if any.
4933    handle: Option<u32>,
4934}
4935
4936/// Represents a Component Model Async `waitable`.
4937#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4938enum Waitable {
4939    /// A host task
4940    Host(TableId<HostTask>),
4941    /// A guest task
4942    Guest(TableId<GuestTask>),
4943    /// The read or write end of a stream or future
4944    Transmit(TableId<TransmitHandle>),
4945}
4946
4947impl Waitable {
4948    /// Retrieve the `Waitable` corresponding to the specified guest-visible
4949    /// handle.
4950    fn from_instance(
4951        state: Pin<&mut ComponentInstance>,
4952        caller_instance: RuntimeComponentInstanceIndex,
4953        waitable: u32,
4954    ) -> Result<Self> {
4955        use crate::runtime::vm::component::Waitable;
4956
4957        let (waitable, kind) = state.instance_states().0[caller_instance]
4958            .handle_table()
4959            .waitable_rep(waitable)?;
4960
4961        Ok(match kind {
4962            Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4963            Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4964            Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4965        })
4966    }
4967
4968    /// Retrieve the host-visible identifier for this `Waitable`.
4969    fn rep(&self) -> u32 {
4970        match self {
4971            Self::Host(id) => id.rep(),
4972            Self::Guest(id) => id.rep(),
4973            Self::Transmit(id) => id.rep(),
4974        }
4975    }
4976
4977    /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4978    /// remove it from any set it may currently belong to (when `set` is
4979    /// `None`).
4980    fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4981        log::trace!("waitable {self:?} join set {set:?}");
4982
4983        let old = mem::replace(&mut self.common(state)?.set, set);
4984
4985        if let Some(old) = old {
4986            match *self {
4987                Waitable::Host(id) => state.remove_child(id, old),
4988                Waitable::Guest(id) => state.remove_child(id, old),
4989                Waitable::Transmit(id) => state.remove_child(id, old),
4990            }?;
4991
4992            state.get_mut(old)?.ready.remove(self);
4993        }
4994
4995        if let Some(set) = set {
4996            match *self {
4997                Waitable::Host(id) => state.add_child(id, set),
4998                Waitable::Guest(id) => state.add_child(id, set),
4999                Waitable::Transmit(id) => state.add_child(id, set),
5000            }?;
5001
5002            if self.common(state)?.event.is_some() {
5003                self.mark_ready(state)?;
5004            }
5005        }
5006
5007        Ok(())
5008    }
5009
5010    /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
5011    fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
5012        Ok(match self {
5013            Self::Host(id) => &mut state.get_mut(*id)?.common,
5014            Self::Guest(id) => &mut state.get_mut(*id)?.common,
5015            Self::Transmit(id) => &mut state.get_mut(*id)?.common,
5016        })
5017    }
5018
5019    /// Set or clear the pending event for this waitable and either deliver it
5020    /// to the first waiter, if any, or mark it as ready to be delivered to the
5021    /// next waiter that arrives.
5022    fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
5023        log::trace!("set event for {self:?}: {event:?}");
5024        self.common(state)?.event = event;
5025        self.mark_ready(state)
5026    }
5027
5028    /// Take the pending event from this waitable, leaving `None` in its place.
5029    fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
5030        let common = self.common(state)?;
5031        let event = common.event.take();
5032        if let Some(set) = self.common(state)?.set {
5033            state.get_mut(set)?.ready.remove(self);
5034        }
5035
5036        Ok(event)
5037    }
5038
5039    /// Deliver the current event for this waitable to the first waiter, if any,
5040    /// or else mark it as ready to be delivered to the next waiter that
5041    /// arrives.
5042    fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
5043        if let Some(set) = self.common(state)?.set {
5044            state.get_mut(set)?.ready.insert(*self);
5045            if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
5046                let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
5047                assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
5048
5049                let item = match mode {
5050                    WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
5051                    WaitMode::Callback(instance) => WorkItem::GuestCall(
5052                        state.get_mut(thread.task)?.instance.index,
5053                        GuestCall {
5054                            thread,
5055                            kind: GuestCallKind::DeliverEvent {
5056                                instance,
5057                                set: Some(set),
5058                            },
5059                        },
5060                    ),
5061                };
5062                state.push_high_priority(item);
5063            }
5064        }
5065        Ok(())
5066    }
5067
5068    /// Remove this waitable from the instance's rep table.
5069    fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
5070        match self {
5071            Self::Host(task) => {
5072                log::trace!("delete host task {task:?}");
5073                state.delete(*task)?;
5074            }
5075            Self::Guest(task) => {
5076                log::trace!("delete guest task {task:?}");
5077                let task = state.delete(*task)?;
5078
5079                // When a guest task is created it increments the
5080                // `ConcurrentState::interesting_tasks` counter, and that needs
5081                // to be paired with a decrement. There are a few situations in
5082                // which the decrement needs to happen which don't all funnel
5083                // through here, so in lieu of that at least try to catch issues
5084                // where we forgot to do a decrement.
5085                debug_assert!(task.decremented_interesting_task_count);
5086            }
5087            Self::Transmit(task) => {
5088                state.delete(*task)?;
5089            }
5090        }
5091
5092        Ok(())
5093    }
5094}
5095
5096impl fmt::Debug for Waitable {
5097    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5098        match self {
5099            Self::Host(id) => write!(f, "{id:?}"),
5100            Self::Guest(id) => write!(f, "{id:?}"),
5101            Self::Transmit(id) => write!(f, "{id:?}"),
5102        }
5103    }
5104}
5105
5106/// Represents a Component Model Async `waitable-set`.
5107#[derive(Default)]
5108struct WaitableSet {
5109    /// Which waitables in this set have pending events, if any.
5110    ready: BTreeSet<Waitable>,
5111    /// Which guest threads are currently waiting on this set, if any.
5112    waiting: BTreeMap<QualifiedThreadId, WaitMode>,
5113    /// Whether this set is a synthetic, internal one meant for handling
5114    /// synchronous calls.
5115    is_sync_call_set: bool,
5116}
5117
5118impl TableDebug for WaitableSet {
5119    fn type_name() -> &'static str {
5120        "WaitableSet"
5121    }
5122}
5123
5124/// Type-erased closure to lower the parameters for a guest task.
5125type RawLower =
5126    Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
5127
5128/// Type-erased closure to lift the result for a guest task.
5129type RawLift = Box<
5130    dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5131>;
5132
5133/// Type erased result of a guest task which may be downcast to the expected
5134/// type by a host caller (or simply ignored in the case of a guest caller; see
5135/// `DummyResult`).
5136type LiftedResult = Box<dyn Any + Send + Sync>;
5137
5138/// Used to return a result from a `LiftFn` when the actual result has already
5139/// been lowered to a guest task's stack and linear memory.
5140struct DummyResult;
5141
5142/// Represents the Component Model Async state of a (sub-)component instance.
5143#[derive(Default)]
5144pub struct ConcurrentInstanceState {
5145    /// Whether backpressure is set for this instance (enabled if >0)
5146    backpressure: u16,
5147    /// Whether this instance can be entered
5148    do_not_enter: bool,
5149    /// Pending calls for this instance which require `Self::backpressure` to be
5150    /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
5151    pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
5152}
5153
5154impl ConcurrentInstanceState {
5155    pub fn pending_is_empty(&self) -> bool {
5156        self.pending.is_empty()
5157    }
5158}
5159
5160#[derive(Debug, Copy, Clone)]
5161pub(crate) enum CurrentThread {
5162    Guest(QualifiedThreadId),
5163    Host(TableId<HostTask>),
5164    None,
5165}
5166
5167impl CurrentThread {
5168    fn guest(&self) -> Option<&QualifiedThreadId> {
5169        match self {
5170            Self::Guest(id) => Some(id),
5171            _ => None,
5172        }
5173    }
5174
5175    fn host(&self) -> Option<TableId<HostTask>> {
5176        match self {
5177            Self::Host(id) => Some(*id),
5178            _ => None,
5179        }
5180    }
5181
5182    fn is_none(&self) -> bool {
5183        matches!(self, Self::None)
5184    }
5185}
5186
5187impl From<QualifiedThreadId> for CurrentThread {
5188    fn from(id: QualifiedThreadId) -> Self {
5189        Self::Guest(id)
5190    }
5191}
5192
5193impl From<TableId<HostTask>> for CurrentThread {
5194    fn from(id: TableId<HostTask>) -> Self {
5195        Self::Host(id)
5196    }
5197}
5198
5199/// Represents the Component Model Async state of a store.
5200pub struct ConcurrentState {
5201    /// The currently running thread, if any.
5202    ///
5203    /// Note that we lazily materialize threads on-demand and this field is not
5204    /// necessarily up-to-date. The `StoreOpaque::current_thread` method should
5205    /// be preferred over directly accessing this field.
5206    unforced_current_thread: CurrentThread,
5207
5208    /// The set of pending host and background tasks, if any.
5209    ///
5210    /// See `ComponentInstance::poll_until` for where we temporarily take this
5211    /// out, poll it, then put it back to avoid any mutable aliasing hazards.
5212    futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
5213    /// The table of waitables, waitable sets, etc.
5214    table: AlwaysMut<ResourceTable>,
5215    /// The "high priority" work queue for this store's event loop.
5216    high_priority: Vec<WorkItem>,
5217    /// The "low priority" work queue for this store's event loop.
5218    low_priority: VecDeque<WorkItem>,
5219    /// A place to stash the reason a fiber is suspending so that the code which
5220    /// resumed it will know under what conditions the fiber should be resumed
5221    /// again.
5222    suspend_reason: Option<SuspendReason>,
5223    /// A cached fiber which is waiting for work to do.
5224    ///
5225    /// This helps us avoid creating a new fiber for each `GuestCall` work item.
5226    worker: Option<StoreFiber<'static>>,
5227    /// A place to stash the work item for which we're resuming a worker fiber.
5228    worker_item: Option<WorkerItem>,
5229
5230    /// Reference counts for all component error contexts
5231    ///
5232    /// NOTE: it is possible the global ref count to be *greater* than the sum of
5233    /// (sub)component ref counts as tracked by `error_context_tables`, for
5234    /// example when the host holds one or more references to error contexts.
5235    ///
5236    /// The key of this primary map is often referred to as the "rep" (i.e. host-side
5237    /// component-wide representation) of the index into concurrent state for a given
5238    /// stored `ErrorContext`.
5239    ///
5240    /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
5241    /// as a `TableId<ErrorContextState>`.
5242    global_error_context_ref_counts:
5243        BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
5244
5245    /// The number of "interesting tasks" currently executing in the store.
5246    ///
5247    /// This tracks the concept of a component instance lifetime as defined in
5248    /// https://github.com/WebAssembly/component-model/pull/643. Specifically
5249    /// all tasks currently increment this counter which then gets decremented
5250    /// when they exit. In the future some tasks might not increment this
5251    /// counter, but for now all do.
5252    ///
5253    /// This is used to implement `Accessor::poll_no_interesting_tasks` to
5254    /// inform the embedder when all tasks have completed. This is then
5255    /// used in wasmtime-wasi-http, for example, to know when an instance is
5256    /// idle.
5257    interesting_tasks: usize,
5258
5259    /// Single waker to notify when `interesting_tasks` reaches 0.
5260    ///
5261    /// Used in the implementation of `Accessor::poll_no_interesting_tasks`.
5262    interesting_tasks_empty_waker: Option<Waker>,
5263}
5264
5265impl Default for ConcurrentState {
5266    fn default() -> Self {
5267        Self {
5268            unforced_current_thread: CurrentThread::None,
5269            table: AlwaysMut::new(ResourceTable::new()),
5270            futures: AlwaysMut::new(Some(FuturesUnordered::new())),
5271            high_priority: Vec::new(),
5272            low_priority: VecDeque::new(),
5273            suspend_reason: None,
5274            worker: None,
5275            worker_item: None,
5276            global_error_context_ref_counts: BTreeMap::new(),
5277            interesting_tasks: 0,
5278            interesting_tasks_empty_waker: None,
5279        }
5280    }
5281}
5282
5283impl ConcurrentState {
5284    /// Take ownership of any fibers and futures owned by this object.
5285    ///
5286    /// This should be used when disposing of the `Store` containing this object
5287    /// in order to gracefully resolve any and all fibers using
5288    /// `StoreFiber::dispose`.  This is necessary to avoid possible
5289    /// use-after-free bugs due to fibers which may still have access to the
5290    /// `Store`.
5291    ///
5292    /// Additionally, the futures collected with this function should be dropped
5293    /// within a `tls::set` call, which will ensure than any futures closing
5294    /// over an `&Accessor` will have access to the store when dropped, allowing
5295    /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
5296    /// panicking.
5297    ///
5298    /// Note that this will leave the object in an inconsistent and unusable
5299    /// state, so it should only be used just prior to dropping it.
5300    pub(crate) fn take_fibers_and_futures(
5301        &mut self,
5302        fibers: &mut Vec<StoreFiber<'static>>,
5303        futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
5304    ) {
5305        for entry in self.table.get_mut().iter_mut() {
5306            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5307                for mode in mem::take(&mut set.waiting).into_values() {
5308                    if let WaitMode::Fiber(fiber) = mode {
5309                        fibers.push(fiber);
5310                    }
5311                }
5312            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5313                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5314                    mem::replace(&mut thread.state, GuestThreadState::Completed)
5315                {
5316                    fibers.push(fiber);
5317                }
5318            }
5319        }
5320
5321        if let Some(fiber) = self.worker.take() {
5322            fibers.push(fiber);
5323        }
5324
5325        let mut handle_item = |item| match item {
5326            WorkItem::ResumeFiber(fiber) => {
5327                fibers.push(fiber);
5328            }
5329            WorkItem::PushFuture(future) => {
5330                self.futures
5331                    .get_mut()
5332                    .as_mut()
5333                    .unwrap()
5334                    .push(future.into_inner());
5335            }
5336            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5337            }
5338        };
5339
5340        for item in mem::take(&mut self.high_priority) {
5341            handle_item(item);
5342        }
5343        for item in mem::take(&mut self.low_priority) {
5344            handle_item(item);
5345        }
5346
5347        if let Some(them) = self.futures.get_mut().take() {
5348            futures.push(them);
5349        }
5350    }
5351
5352    #[cfg(feature = "gc")]
5353    pub(crate) fn trace_fiber_roots(
5354        &mut self,
5355        modules: &ModuleRegistry,
5356        unwind: &dyn Unwind,
5357        gc_roots_list: &mut GcRootsList,
5358    ) {
5359        let ConcurrentState {
5360            table,
5361            worker,
5362            high_priority,
5363            low_priority,
5364
5365            // TODO(cm-gc): This field contains `ValRaw`s, but they are never GC
5366            // references because the component model doesn't support GC yet. We
5367            // will need to trace these somehow when it does.
5368            futures: _,
5369
5370            // These fields do not contain GC references.
5371            worker_item: _,
5372            unforced_current_thread: _,
5373            suspend_reason: _,
5374            global_error_context_ref_counts: _,
5375            interesting_tasks: _,
5376            interesting_tasks_empty_waker: _,
5377        } = self;
5378
5379        for entry in table.get_mut().iter_mut() {
5380            if let Some(set) = entry.downcast_mut::<WaitableSet>() {
5381                for mode in set.waiting.values_mut() {
5382                    if let WaitMode::Fiber(fiber) = mode {
5383                        fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5384                    }
5385                }
5386            } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
5387                if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready { fiber, .. } =
5388                    &mut thread.state
5389                {
5390                    fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5391                }
5392            }
5393        }
5394
5395        if let Some(fiber) = worker {
5396            fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5397        }
5398
5399        let mut handle_item = |item: &mut WorkItem| match item {
5400            WorkItem::ResumeFiber(fiber) => {
5401                fiber.trace_gc_roots(modules, unwind, gc_roots_list);
5402            }
5403            WorkItem::PushFuture(_future) => {
5404                // TODO(cm-gc): once futures can contain GC roots, we will need
5405                // to trace them.
5406            }
5407            WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
5408            }
5409        };
5410
5411        for item in high_priority {
5412            handle_item(item);
5413        }
5414        for item in low_priority {
5415            handle_item(item);
5416        }
5417    }
5418
5419    fn push<V: Send + Sync + 'static>(
5420        &mut self,
5421        value: V,
5422    ) -> Result<TableId<V>, ResourceTableError> {
5423        self.table.get_mut().push(value).map(TableId::from)
5424    }
5425
5426    fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5427        self.table.get_mut().get_mut(&Resource::from(id))
5428    }
5429
5430    pub fn add_child<T: 'static, U: 'static>(
5431        &mut self,
5432        child: TableId<T>,
5433        parent: TableId<U>,
5434    ) -> Result<(), ResourceTableError> {
5435        self.table
5436            .get_mut()
5437            .add_child(Resource::from(child), Resource::from(parent))
5438    }
5439
5440    pub fn remove_child<T: 'static, U: 'static>(
5441        &mut self,
5442        child: TableId<T>,
5443        parent: TableId<U>,
5444    ) -> Result<(), ResourceTableError> {
5445        self.table
5446            .get_mut()
5447            .remove_child(Resource::from(child), Resource::from(parent))
5448    }
5449
5450    fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5451        self.table.get_mut().delete(Resource::from(id))
5452    }
5453
5454    fn push_future(&mut self, future: HostTaskFuture) {
5455        // Note that we can't directly push to `ConcurrentState::futures` here
5456        // since this may be called from a future that's being polled inside
5457        // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5458        // so it has exclusive access while polling it.  Therefore, we push a
5459        // work item to the "high priority" queue, which will actually push to
5460        // `ConcurrentState::futures` later.
5461        self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5462    }
5463
5464    fn push_high_priority(&mut self, item: WorkItem) {
5465        log::trace!("push high priority: {item:?}");
5466        self.high_priority.push(item);
5467    }
5468
5469    fn push_low_priority(&mut self, item: WorkItem) {
5470        log::trace!("push low priority: {item:?}");
5471        self.low_priority.push_front(item);
5472    }
5473
5474    fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5475        if high_priority {
5476            self.push_high_priority(item);
5477        } else {
5478            self.push_low_priority(item);
5479        }
5480    }
5481
5482    fn promote_instance_local_thread_work_item(
5483        &mut self,
5484        current_instance: RuntimeComponentInstanceIndex,
5485    ) -> bool {
5486        self.promote_work_items_matching(|item: &WorkItem| match item {
5487            WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5488                *instance == current_instance
5489            }
5490            _ => false,
5491        })
5492    }
5493
5494    fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5495        self.promote_work_items_matching(|item: &WorkItem| match item {
5496            WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5497                *t == thread
5498            }
5499            _ => false,
5500        })
5501    }
5502
5503    fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5504    where
5505        F: FnMut(&WorkItem) -> bool,
5506    {
5507        // If there's a high-priority work item to resume the current guest thread,
5508        // we don't need to promote anything, but we return true to indicate that
5509        // work is pending for the current instance.
5510        if self.high_priority.iter().any(&mut predicate) {
5511            true
5512        }
5513        // Otherwise, look for a low-priority work item that matches the current
5514        // instance and promote it to high-priority.
5515        else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5516            let item = self.low_priority.remove(idx).unwrap();
5517            self.push_high_priority(item);
5518            true
5519        } else {
5520            false
5521        }
5522    }
5523
5524    fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5525        if self.may_block(task)? {
5526            Ok(())
5527        } else {
5528            Err(Trap::CannotBlockSyncTask.into())
5529        }
5530    }
5531
5532    fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5533        let task = self.get_mut(task)?;
5534        Ok(task.async_function || task.returned_or_cancelled())
5535    }
5536
5537    /// Used by `ResourceTables` to acquire the current `CallContext` for the
5538    /// specified task.
5539    ///
5540    /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5541    /// below.
5542    pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5543        let (task, is_host) = (task >> 1, task & 1 == 1);
5544        if is_host {
5545            let task: TableId<HostTask> = TableId::new(task);
5546            Ok(&mut self.get_mut(task)?.call_context)
5547        } else {
5548            let task: TableId<GuestTask> = TableId::new(task);
5549            Ok(&mut self.get_mut(task)?.call_context)
5550        }
5551    }
5552
5553    fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5554        match self.futures.get_mut().as_mut() {
5555            Some(f) => Ok(f),
5556            None => bail_bug!("futures field of concurrent state is currently taken"),
5557        }
5558    }
5559
5560    pub(crate) fn table(&mut self) -> &mut ResourceTable {
5561        self.table.get_mut()
5562    }
5563
5564    /// Returns the parent thread, if any, of `cur`.
5565    fn parent(&mut self, cur: CurrentThread) -> Option<CurrentThread> {
5566        match cur {
5567            CurrentThread::Guest(thread) => {
5568                let task = self.get_mut(thread.task).ok()?;
5569                Some(match task.caller {
5570                    Caller::Host { caller, .. } => caller,
5571                    Caller::Guest { thread } => thread.into(),
5572                })
5573            }
5574            CurrentThread::Host(id) => Some(self.get_mut(id).ok()?.caller.into()),
5575            CurrentThread::None => None,
5576        }
5577    }
5578}
5579
5580/// Provide a type hint to compiler about the shape of a parameter lower
5581/// closure.
5582fn for_any_lower<
5583    F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5584>(
5585    fun: F,
5586) -> F {
5587    fun
5588}
5589
5590/// Provide a type hint to compiler about the shape of a result lift closure.
5591fn for_any_lift<
5592    F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5593>(
5594    fun: F,
5595) -> F {
5596    fun
5597}
5598
5599fn check_ambient_store(id: StoreId) {
5600    let message = "\
5601        `Future`s which depend on asynchronous component tasks, streams, or \
5602        futures to complete may only be polled from the event loop of the \
5603        store to which they belong.  Please use \
5604        `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5605    ";
5606    tls::try_get(|store| {
5607        let matched = match store {
5608            tls::TryGet::Some(store) => store.id() == id,
5609            tls::TryGet::Taken | tls::TryGet::None => false,
5610        };
5611
5612        if !matched {
5613            panic!("{message}")
5614        }
5615    });
5616}
5617
5618/// Assert that `StoreContextMut::run_concurrent` has not been called from
5619/// within an store's event loop.
5620fn check_recursive_run() {
5621    tls::try_get(|store| {
5622        if !matches!(store, tls::TryGet::None) {
5623            panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5624        }
5625    });
5626}
5627
5628fn unpack_callback_code(code: u32) -> (u32, u32) {
5629    (code & 0xF, code >> 4)
5630}
5631
5632/// Helper struct for packaging parameters to be passed to
5633/// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5634/// `waitable-set.poll`.
5635struct WaitableCheckParams {
5636    set: TableId<WaitableSet>,
5637    options: OptionsIndex,
5638    payload: u32,
5639}
5640
5641/// Indicates whether `ComponentInstance::waitable_check` is being called for
5642/// `waitable-set.wait` or `waitable-set.poll`.
5643enum WaitableCheck {
5644    Wait,
5645    Poll,
5646}
5647
5648/// An identifier representing a guest task within a component.
5649///
5650/// This can be acquired by calling [`Func::start_call_concurrent`] or
5651/// [`TypedFunc::start_call_concurrent`] and then using the
5652/// [`FuncCallConcurrent::task`] accessor, for example. This can then be
5653/// reflected on with [`StoreContextMut::async_call_stack`].
5654///
5655/// [`TypedFunc::start_call_concurrent`]: crate::component::TypedFunc::start_call_concurrent
5656#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
5657pub struct GuestTaskId(TableId<GuestTask>);
5658
5659/// Represents a guest task called from the host, prepared using `prepare_call`.
5660pub(crate) struct PreparedCall<R> {
5661    /// The guest export to be called
5662    handle: Func,
5663    /// The guest thread created by `prepare_call`
5664    thread: QualifiedThreadId,
5665    /// The number of lowered core Wasm parameters to pass to the call.
5666    param_count: usize,
5667    /// The `oneshot::Receiver` to which the result of the call will be
5668    /// delivered when it is available.
5669    rx: oneshot::Receiver<LiftedResult>,
5670    /// The instance that this call is prepared for.
5671    runtime_instance: RuntimeInstance,
5672    _phantom: PhantomData<R>,
5673}
5674
5675impl<R> PreparedCall<R> {
5676    /// Get a copy of the `TaskId` for this `PreparedCall`.
5677    pub(crate) fn task_id(&self) -> TaskId {
5678        TaskId {
5679            task: self.thread.task,
5680            runtime_instance: self.runtime_instance,
5681        }
5682    }
5683}
5684
5685/// Represents a task created by `prepare_call`.
5686pub(crate) struct TaskId {
5687    task: TableId<GuestTask>,
5688    runtime_instance: RuntimeInstance,
5689}
5690
5691impl TaskId {
5692    /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5693    /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5694    /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5695    /// and can be resumed by other tasks for this component, so we mark the future as dropped
5696    /// and delete the task when all threads are done.
5697    pub(crate) fn host_future_dropped(&self, store: &mut StoreOpaque) -> Result<()> {
5698        let task = store.concurrent_state_mut()?.get_mut(self.task)?;
5699        let delete = if !task.already_lowered_parameters() {
5700            store.cancel_guest_subtask_without_lowered_parameters(
5701                self.runtime_instance,
5702                self.task,
5703            )?;
5704            true
5705        } else {
5706            task.host_future_state = HostFutureState::Dropped;
5707            task.ready_to_delete()
5708        };
5709        if delete {
5710            Waitable::Guest(self.task).delete_from(store.concurrent_state_mut()?)?
5711        }
5712        Ok(())
5713    }
5714}
5715
5716/// Prepare a call to the specified exported Wasm function, providing functions
5717/// for lowering the parameters and lifting the result.
5718///
5719/// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5720/// loop, use `queue_call`.
5721pub(crate) fn prepare_call<T, R>(
5722    mut store: StoreContextMut<T>,
5723    handle: Func,
5724    param_count: usize,
5725    host_future_present: bool,
5726    lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5727    + Send
5728    + Sync
5729    + 'static,
5730    lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5731    + Send
5732    + Sync
5733    + 'static,
5734) -> Result<PreparedCall<R>> {
5735    let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5736
5737    let instance = handle.instance().id().get(store.0);
5738    let options = &instance.component().env_component().options[options];
5739    let ty = &instance.component().types()[ty];
5740    let async_function = ty.async_;
5741    let task_return_type = ty.results;
5742    let component_instance = raw_options.instance;
5743    let callback = options.callback.map(|i| instance.runtime_callback(i));
5744    let memory = options
5745        .memory()
5746        .map(|i| instance.runtime_memory(i))
5747        .map(SendSyncPtr::new);
5748    let string_encoding = options.string_encoding;
5749    let token = StoreToken::new(store.as_context_mut());
5750    let caller = store.0.current_thread()?;
5751    let state = store.0.concurrent_state_mut()?;
5752
5753    let (tx, rx) = oneshot::channel();
5754
5755    let instance = handle.instance().runtime_instance(component_instance);
5756    let thread = GuestTask::new(
5757        state,
5758        Box::new(for_any_lower(move |store, params| {
5759            lower_params(handle, token.as_context_mut(store), params)
5760        })),
5761        LiftResult {
5762            lift: Box::new(for_any_lift(move |store, result| {
5763                lift_result(handle, store, result)
5764            })),
5765            ty: task_return_type,
5766            memory,
5767            string_encoding,
5768        },
5769        Caller::Host {
5770            tx: Some(tx),
5771            host_future_present,
5772            caller,
5773        },
5774        callback.map(|callback| {
5775            let callback = SendSyncPtr::new(callback);
5776            let instance = handle.instance();
5777            Box::new(move |store: &mut dyn VMStore, event, handle| {
5778                let store = token.as_context_mut(store);
5779                // SAFETY: Per the contract of `prepare_call`, the callback
5780                // will remain valid at least as long is this task exists.
5781                unsafe { instance.call_callback(store, callback, event, handle) }
5782            }) as CallbackFn
5783        }),
5784        instance,
5785        async_function,
5786    )?;
5787
5788    if !store.0.may_enter(instance)? {
5789        bail!(Trap::CannotEnterComponent);
5790    }
5791
5792    Ok(PreparedCall {
5793        handle,
5794        thread,
5795        param_count,
5796        runtime_instance: instance,
5797        rx,
5798        _phantom: PhantomData,
5799    })
5800}
5801
5802pub(crate) struct QueuedCall<R> {
5803    store: StoreId,
5804    task: TableId<GuestTask>,
5805    rx: oneshot::Receiver<LiftedResult>,
5806    _marker: PhantomData<fn() -> R>,
5807}
5808
5809impl<R> QueuedCall<R> {
5810    /// Queue a call previously prepared using `prepare_call` to be run as part of
5811    /// the associated `ComponentInstance`'s event loop.
5812    ///
5813    /// The returned future will resolve to the result once it is available, but
5814    /// must only be polled via the instance's event loop. See
5815    /// `StoreContextMut::run_concurrent` for details.
5816    pub(crate) fn new<T: 'static>(
5817        mut store: StoreContextMut<T>,
5818        prepared: PreparedCall<R>,
5819    ) -> Result<QueuedCall<R>> {
5820        let PreparedCall {
5821            handle,
5822            thread,
5823            param_count,
5824            rx,
5825            ..
5826        } = prepared;
5827
5828        queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5829
5830        Ok(QueuedCall {
5831            store: store.0.id(),
5832            task: thread.task,
5833            rx,
5834            _marker: PhantomData,
5835        })
5836    }
5837
5838    fn task(&self) -> GuestTaskId {
5839        GuestTaskId(self.task)
5840    }
5841}
5842
5843impl<R> Future for QueuedCall<R>
5844where
5845    R: 'static,
5846{
5847    type Output = Result<R>;
5848
5849    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
5850        check_ambient_store(self.store);
5851        Pin::new(&mut self.rx).poll(cx).map(|result| match result {
5852            Ok(r) => match r.downcast() {
5853                Ok(r) => Ok(*r),
5854                Err(_) => bail_bug!("wrong type of value produced"),
5855            },
5856            Err(oneshot::Canceled) => bail_bug!("channel erroneously dropped"),
5857        })
5858    }
5859}
5860
5861/// Queue a call previously prepared using `prepare_call` to be run as part of
5862/// the associated `ComponentInstance`'s event loop.
5863fn queue_call0<T: 'static>(
5864    store: StoreContextMut<T>,
5865    handle: Func,
5866    guest_thread: QualifiedThreadId,
5867    param_count: usize,
5868) -> Result<()> {
5869    let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5870    let is_concurrent = raw_options.async_;
5871    let callback = raw_options.callback;
5872    let instance = handle.instance();
5873    let callee = handle.lifted_core_func(store.0);
5874    let post_return = handle.post_return_core_func(store.0);
5875    let callback = callback.map(|i| {
5876        let instance = instance.id().get(store.0);
5877        SendSyncPtr::new(instance.runtime_callback(i))
5878    });
5879
5880    log::trace!("queueing call {guest_thread:?}");
5881
5882    // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5883    // (with signatures appropriate for this call) and will remain valid as
5884    // long as this instance is valid.
5885    unsafe {
5886        instance.queue_call(
5887            store,
5888            guest_thread,
5889            SendSyncPtr::new(callee),
5890            param_count,
5891            1,
5892            is_concurrent,
5893            callback,
5894            post_return.map(SendSyncPtr::new),
5895        )
5896    }
5897}